tor_proto/congestion/
rtt.rs

1//! Round Trip Time measurement (§ 2.1)
2
3use std::cmp::{max, min};
4use std::collections::VecDeque;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::time::{Duration, Instant};
7
8use super::params::RoundTripEstimatorParams;
9use super::{CongestionWindow, State};
10
11use thiserror::Error;
12use tor_error::{ErrorKind, HasKind};
13
14/// An error originating from the tor-congestion crate.
15#[derive(Error, Debug, Clone)]
16#[non_exhaustive]
17pub(crate) enum Error {
18    /// A call to `RoundtripTimeEstimator::sendme_received` was made without calling
19    /// `RoundtripTimeEstimator::expect_sendme` first.
20    #[error("Informed of a SENDME we weren't expecting")]
21    MismatchedEstimationCall,
22}
23
24impl HasKind for Error {
25    fn kind(&self) -> ErrorKind {
26        use Error as E;
27        match self {
28            E::MismatchedEstimationCall => ErrorKind::TorProtocolViolation,
29        }
30    }
31}
32
33/// Provides an estimate of the round-trip time (RTT) of a Tor circuit.
34#[derive(Debug)]
35#[allow(dead_code)]
36pub(crate) struct RoundtripTimeEstimator {
37    /// A queue of times we sent a cell that we'd expect a SENDME for.
38    ///
39    /// When a data cell is sent and for which we expect a SENDME next, the timestamp at the send
40    /// is kept in this queue so we can use it to measure the RTT when the SENDME is received.
41    ///
42    /// A queue is used here because the protocol allows to send all pending SENDMEs at once as
43    /// long as it is within one congestion window.
44    sendme_expected_from: VecDeque<Instant>,
45    /// The last *measured* round-trip time.
46    last_rtt: Duration,
47    /// The current smoothed *estimate* of what the round-trip time is.
48    ///
49    /// This is zero iff we have not managed to get any estimate yet.
50    ewma_rtt: Duration,
51    /// The minimum observed value of `last_rtt`.
52    min_rtt: Duration,
53    /// The maximum observed value of `last_rtt`.
54    max_rtt: Duration,
55    /// The network parameters we're using.
56    params: RoundTripEstimatorParams,
57    /// A reference to a shared boolean for storing if the clock is stalled or not.
58    /// Spec: CLOCK_HEURISTICS from prop324. See is_clock_stalled() for the implementation.
59    clock_stalled: AtomicBool,
60}
61
62#[allow(dead_code)]
63impl RoundtripTimeEstimator {
64    /// Create a new `RoundtripTimeEstimator`, using a set of `NetParameters` and a shared boolean
65    /// to cache clock stalled state in.
66    pub(crate) fn new(params: &RoundTripEstimatorParams) -> Self {
67        Self {
68            sendme_expected_from: Default::default(),
69            last_rtt: Default::default(),
70            ewma_rtt: Default::default(),
71            min_rtt: Duration::ZERO,
72            max_rtt: Default::default(),
73            params: params.clone(),
74            clock_stalled: AtomicBool::default(),
75        }
76    }
77
78    /// Return true iff the estimator is ready to be used or read.
79    pub(crate) fn is_ready(&self) -> bool {
80        !self.clock_stalled() && !self.last_rtt.is_zero()
81    }
82
83    /// Return the state of the clock stalled indicator.
84    pub(crate) fn clock_stalled(&self) -> bool {
85        self.clock_stalled.load(Ordering::SeqCst)
86    }
87
88    /// Return the EWMA RTT in usec or u32 MAX if we don't have an estimate yet.
89    pub(crate) fn ewma_rtt_usec(&self) -> u32 {
90        u32::try_from(self.ewma_rtt.as_micros()).unwrap_or(u32::MAX)
91    }
92
93    /// Return the Minimum RTT in usec or u32 MAX value if we don't have an estimate yet.
94    pub(crate) fn min_rtt_usec(&self) -> u32 {
95        u32::try_from(self.min_rtt.as_micros()).unwrap_or(u32::MAX)
96    }
97
98    /// Inform the estimator that we did (at time `now`) something that we'll expect a SENDME to
99    /// be received for.
100    pub(crate) fn expect_sendme(&mut self, now: Instant) {
101        self.sendme_expected_from.push_back(now);
102    }
103
104    /// Return whether we can use heuristics to sanity-check RTT values against our EWMA value.
105    /// Spec: 2.1.1. Clock Jump Heuristics CLOCK_HEURISTICS
106    ///
107    /// Used in [`is_clock_stalled`](RoundtripTimeEstimator::is_clock_stalled), to check the sanity of
108    /// a newly measured RTT value.
109    fn can_crosscheck_with_current_estimate(&self, in_slow_start: bool) -> bool {
110        // If we're in slow start, we don't perform any sanity checks, as per spec. If we don't
111        // have a current estimate, we can't use it for sanity checking, because it doesn't
112        // exist.
113        !(in_slow_start || self.ewma_rtt.is_zero())
114    }
115
116    /// Given a raw RTT value we just observed, compute whether or not we think the clock has
117    /// stalled or jumped, and we should throw it out as a result.
118    fn is_clock_stalled(&self, raw_rtt: Duration, in_slow_start: bool) -> bool {
119        if raw_rtt.is_zero() {
120            // Clock is stalled.
121            self.clock_stalled.store(true, Ordering::SeqCst);
122            true
123        } else if self.can_crosscheck_with_current_estimate(in_slow_start) {
124            /// Discrepancy ratio of a new RTT value that we allow against the current RTT in order
125            /// to declare if the clock has stalled or not. This value is taken from proposal 324
126            /// section 2.1.1 CLOCK_HEURISTICS and has the same name as in C-tor.
127            const DELTA_DISCREPANCY_RATIO_MAX: u32 = 5000;
128            // If we have enough data, check the sanity of our measurement against our EWMA value.
129            if raw_rtt > self.ewma_rtt * DELTA_DISCREPANCY_RATIO_MAX {
130                // The clock significantly jumped forward.
131                //
132                // Don't update the global cache, though, since this is triggerable over the
133                // network.
134                //
135                // FIXME(eta): We should probably log something here?
136                true
137            } else if self.ewma_rtt > raw_rtt * DELTA_DISCREPANCY_RATIO_MAX {
138                // The clock might have stalled. We can't really make a decision just off this
139                // one measurement, though, so we'll use the stored stall value.
140                self.clock_stalled.load(Ordering::SeqCst)
141            } else {
142                // If we got here, we're not stalled.
143                self.clock_stalled.store(false, Ordering::SeqCst);
144                false
145            }
146        } else {
147            // If we don't have enough measurements to sanity check, assume it's okay.
148            false
149        }
150    }
151
152    /// Update the estimator on time `now` and at the congestion window `cwnd`.
153    ///
154    /// # Errors
155    ///
156    /// Each call to this function removes an entry from `sendme_expected_from` (the entries are
157    /// added using [`sendme_expected_from`](Self::sendme_expected_from)).
158    ///
159    /// Returns an error if are not expecting any SENDMEs at this time (if `expect_sendme` was
160    /// never called, or if we have exhausted all `sendme_expected_from` added by previous
161    /// `expect_sendme` calls).
162    ///
163    /// Spec: prop324 section 2.1 C-tor: congestion_control_update_circuit_rtt() in
164    /// congestion_control_common.c
165    pub(crate) fn update(
166        &mut self,
167        now: Instant,
168        state: &State,
169        cwnd: &CongestionWindow,
170    ) -> Result<(), Error> {
171        let data_sent_at = self
172            .sendme_expected_from
173            .pop_front()
174            .ok_or(Error::MismatchedEstimationCall)?;
175        let raw_rtt = now.saturating_duration_since(data_sent_at);
176
177        if self.is_clock_stalled(raw_rtt, state.in_slow_start()) {
178            return Ok(());
179        }
180
181        self.max_rtt = self.max_rtt.max(raw_rtt);
182        self.last_rtt = raw_rtt;
183
184        // This is the "N" for N-EWMA.
185        let ewma_n = u64::from(if state.in_slow_start() {
186            self.params.ewma_ss_max()
187        } else {
188            min(
189                (cwnd.update_rate(state) * (self.params.ewma_cwnd_pct().as_percent())) / 100,
190                self.params.ewma_max(),
191            )
192        });
193        let ewma_n = max(ewma_n, 2);
194
195        // Get the USEC values.
196        let raw_rtt_usec = raw_rtt.as_micros() as u64;
197        let prev_ewma_rtt_usec = self.ewma_rtt.as_micros() as u64;
198
199        // This is the actual EWMA calculation.
200        // C-tor simplifies this as follows for rounding error reasons:
201        //
202        // EWMA = value*2/(N+1) + EMA_prev*(N-1)/(N+1)
203        //      = (value*2 + EWMA_prev*(N-1))/(N+1)
204        //
205        // Spec: prop324 section 2.1.2 (N_EWMA_SMOOTHING)
206        let new_ewma_rtt_usec = if prev_ewma_rtt_usec == 0 {
207            raw_rtt_usec
208        } else {
209            ((raw_rtt_usec * 2) + ((ewma_n - 1) * prev_ewma_rtt_usec)) / (ewma_n + 1)
210        };
211        self.ewma_rtt = Duration::from_micros(new_ewma_rtt_usec);
212
213        if self.min_rtt.is_zero() {
214            self.min_rtt = self.ewma_rtt;
215        } else if cwnd.get() == cwnd.min() && !state.in_slow_start() {
216            // The cast is OK even if lossy, we only care about the usec level.
217            let max = max(self.ewma_rtt, self.min_rtt).as_micros() as u64;
218            let min = min(self.ewma_rtt, self.min_rtt).as_micros() as u64;
219            let rtt_reset_pct = u64::from(self.params.rtt_reset_pct().as_percent());
220            self.min_rtt = Duration::from_micros(
221                (rtt_reset_pct * max / 100) + (100 - rtt_reset_pct) * min / 100,
222            );
223        } else if self.ewma_rtt < self.min_rtt {
224            self.min_rtt = self.ewma_rtt;
225        }
226
227        Ok(())
228    }
229}
230
231#[cfg(test)]
232#[allow(clippy::print_stderr)]
233mod test {
234    // @@ begin test lint list maintained by maint/add_warning @@
235    #![allow(clippy::bool_assert_comparison)]
236    #![allow(clippy::clone_on_copy)]
237    #![allow(clippy::dbg_macro)]
238    #![allow(clippy::mixed_attributes_style)]
239    #![allow(clippy::print_stderr)]
240    #![allow(clippy::print_stdout)]
241    #![allow(clippy::single_char_pattern)]
242    #![allow(clippy::unwrap_used)]
243    #![allow(clippy::unchecked_duration_subtraction)]
244    #![allow(clippy::useless_vec)]
245    #![allow(clippy::needless_pass_by_value)]
246    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
247
248    use std::time::{Duration, Instant};
249
250    use crate::congestion::test_utils::{new_cwnd, new_rtt_estimator};
251
252    use super::*;
253
254    #[derive(Debug)]
255    struct RttTestSample {
256        sent_usec_in: u64,
257        sendme_received_usec_in: u64,
258        cwnd_in: u32,
259        ss_in: bool,
260        last_rtt_usec_out: u64,
261        ewma_rtt_usec_out: u64,
262        min_rtt_usec_out: u64,
263    }
264
265    impl From<[u64; 7]> for RttTestSample {
266        fn from(arr: [u64; 7]) -> Self {
267            Self {
268                sent_usec_in: arr[0],
269                sendme_received_usec_in: arr[1],
270                cwnd_in: arr[2] as u32,
271                ss_in: arr[3] == 1,
272                last_rtt_usec_out: arr[4],
273                ewma_rtt_usec_out: arr[5],
274                min_rtt_usec_out: arr[6],
275            }
276        }
277    }
278    impl RttTestSample {
279        fn test(&self, estimator: &mut RoundtripTimeEstimator, start: Instant) {
280            let state = if self.ss_in {
281                State::SlowStart
282            } else {
283                State::Steady
284            };
285            let mut cwnd = new_cwnd();
286            cwnd.set(self.cwnd_in);
287            let sent = start + Duration::from_micros(self.sent_usec_in);
288            let sendme_received = start + Duration::from_micros(self.sendme_received_usec_in);
289
290            estimator.expect_sendme(sent);
291            estimator
292                .update(sendme_received, &state, &cwnd)
293                .expect("Error on RTT update");
294            assert_eq!(
295                estimator.last_rtt,
296                Duration::from_micros(self.last_rtt_usec_out)
297            );
298            assert_eq!(
299                estimator.ewma_rtt,
300                Duration::from_micros(self.ewma_rtt_usec_out)
301            );
302            assert_eq!(
303                estimator.min_rtt,
304                Duration::from_micros(self.min_rtt_usec_out)
305            );
306        }
307    }
308
309    #[test]
310    fn test_vectors() {
311        let mut rtt = new_rtt_estimator();
312        let now = Instant::now();
313        // from C-tor src/test/test_congestion_control.c
314        let vectors = [
315            [100000, 200000, 124, 1, 100000, 100000, 100000],
316            [200000, 300000, 124, 1, 100000, 100000, 100000],
317            [350000, 500000, 124, 1, 150000, 133333, 100000],
318            [500000, 550000, 124, 1, 50000, 77777, 77777],
319            [600000, 700000, 124, 1, 100000, 92592, 77777],
320            [700000, 750000, 124, 1, 50000, 64197, 64197],
321            [750000, 875000, 124, 0, 125000, 104732, 104732],
322            [875000, 900000, 124, 0, 25000, 51577, 104732],
323            [900000, 950000, 200, 0, 50000, 50525, 50525],
324        ];
325        for vect in vectors {
326            let vect = RttTestSample::from(vect);
327            eprintln!("Testing vector: {:?}", vect);
328            vect.test(&mut rtt, now);
329        }
330    }
331}