1
//! Round Trip Time measurement (ยง 2.1)
2

            
3
use std::cmp::{max, min};
4
use std::collections::VecDeque;
5
use std::sync::atomic::{AtomicBool, Ordering};
6
use std::time::{Duration, Instant};
7

            
8
use super::params::RoundTripEstimatorParams;
9
use super::{CongestionWindow, State};
10

            
11
use thiserror::Error;
12
use tor_error::{ErrorKind, HasKind};
13

            
14
/// An error originating from the tor-congestion crate.
15
#[derive(Error, Debug, Clone)]
16
#[non_exhaustive]
17
pub(crate) enum Error {
18
    /// A call to `RoundtripTimeEstimator::sendme_received` was made without calling
19
    /// `RoundtripTimeEstimator::expect_sendme` first.
20
    #[error("Informed of a SENDME we weren't expecting")]
21
    MismatchedEstimationCall,
22
}
23

            
24
impl HasKind for Error {
25
    fn kind(&self) -> ErrorKind {
26
        use Error as E;
27
        match self {
28
            E::MismatchedEstimationCall => ErrorKind::TorProtocolViolation,
29
        }
30
    }
31
}
32

            
33
/// Provides an estimate of the round-trip time (RTT) of a Tor circuit.
34
#[derive(Debug)]
35
#[allow(dead_code)]
36
pub(crate) struct RoundtripTimeEstimator {
37
    /// A queue of times we sent a cell that we'd expect a SENDME for.
38
    ///
39
    /// When a data cell is sent and for which we expect a SENDME next, the timestamp at the send
40
    /// is kept in this queue so we can use it to measure the RTT when the SENDME is received.
41
    ///
42
    /// A queue is used here because the protocol allows to send all pending SENDMEs at once as
43
    /// long as it is within one congestion window.
44
    sendme_expected_from: VecDeque<Instant>,
45
    /// The last *measured* round-trip time.
46
    ///
47
    /// This is `None` iff we have not managed to get any estimate yet.
48
    last_rtt: Option<Duration>,
49
    /// The current smoothed *estimate* of what the round-trip time is.
50
    ///
51
    /// This is `None` iff we have not managed to get any estimate yet.
52
    ewma_rtt: Option<Duration>,
53
    /// The minimum observed value of `last_rtt`.
54
    ///
55
    /// This is `None` iff we have not managed to get any estimate yet.
56
    min_rtt: Option<Duration>,
57
    /// The maximum observed value of `last_rtt`.
58
    ///
59
    /// This is `None` iff we have not managed to get any estimate yet.
60
    max_rtt: Option<Duration>,
61
    /// The network parameters we're using.
62
    params: RoundTripEstimatorParams,
63
    /// A reference to a shared boolean for storing if the clock is stalled or not.
64
    /// Spec: CLOCK_HEURISTICS from prop324. See is_clock_stalled() for the implementation.
65
    clock_stalled: AtomicBool,
66
}
67

            
68
#[allow(dead_code)]
69
impl RoundtripTimeEstimator {
70
    /// Create a new `RoundtripTimeEstimator`, using a set of `NetParameters` and a shared boolean
71
    /// to cache clock stalled state in.
72
1030
    pub(crate) fn new(params: &RoundTripEstimatorParams) -> Self {
73
1030
        Self {
74
1030
            sendme_expected_from: Default::default(),
75
1030
            last_rtt: None,
76
1030
            ewma_rtt: None,
77
1030
            min_rtt: None,
78
1030
            max_rtt: None,
79
1030
            params: params.clone(),
80
1030
            clock_stalled: AtomicBool::default(),
81
1030
        }
82
1030
    }
83

            
84
    /// Return true iff the estimator is ready to be used or read.
85
312
    pub(crate) fn is_ready(&self) -> bool {
86
312
        !self.clock_stalled() && self.last_rtt.is_some()
87
312
    }
88

            
89
    /// Return the state of the clock stalled indicator.
90
468
    pub(crate) fn clock_stalled(&self) -> bool {
91
468
        self.clock_stalled.load(Ordering::SeqCst)
92
468
    }
93

            
94
    /// Return the EWMA RTT in usec or `None` if we don't have an estimate yet.
95
2676
    pub(crate) fn ewma_rtt_usec(&self) -> Option<u32> {
96
2676
        self.ewma_rtt
97
3270
            .map(|rtt| u32::try_from(rtt.as_micros()).ok().unwrap_or(u32::MAX))
98
2676
    }
99

            
100
    /// Return the Minimum RTT in usec or `None` if we don't have an estimate yet.
101
276
    pub(crate) fn min_rtt_usec(&self) -> Option<u32> {
102
276
        self.min_rtt
103
414
            .map(|rtt| u32::try_from(rtt.as_micros()).ok().unwrap_or(u32::MAX))
104
276
    }
105

            
106
    /// Return the maximum observed RTT in usec or `None` if we don't have an estimate yet.
107
66
    pub(crate) fn max_rtt_usec(&self) -> Option<u32> {
108
66
        self.max_rtt
109
66
            .map(|rtt| u32::try_from(rtt.as_micros()).ok().unwrap_or(u32::MAX))
110
66
    }
111

            
112
    /// Inform the estimator that we did (at time `now`) something that we'll expect a SENDME to
113
    /// be received for.
114
174
    pub(crate) fn expect_sendme(&mut self, now: Instant) {
115
174
        self.sendme_expected_from.push_back(now);
116
174
    }
117

            
118
    /// Return whether we can use heuristics to sanity-check RTT values against our EWMA value.
119
    /// Spec: 2.1.1. Clock Jump Heuristics CLOCK_HEURISTICS
120
    ///
121
    /// Used in [`is_clock_stalled`](RoundtripTimeEstimator::is_clock_stalled), to check the sanity of
122
    /// a newly measured RTT value.
123
174
    fn can_crosscheck_with_current_estimate(&self, in_slow_start: bool) -> bool {
124
        // If we're in slow start, we don't perform any sanity checks, as per spec. If we don't
125
        // have a current estimate, we can't use it for sanity checking, because it doesn't
126
        // exist.
127
174
        !in_slow_start && self.ewma_rtt.is_some()
128
174
    }
129

            
130
    /// Given a raw RTT value we just observed, compute whether or not we think the clock has
131
    /// stalled or jumped, and we should throw it out as a result.
132
174
    fn is_clock_stalled(&self, raw_rtt: Duration, in_slow_start: bool) -> bool {
133
174
        if raw_rtt.is_zero() {
134
            // Clock is stalled.
135
            self.clock_stalled.store(true, Ordering::SeqCst);
136
            true
137
174
        } else if self.can_crosscheck_with_current_estimate(in_slow_start) {
138
34
            let ewma_rtt = self
139
34
                .ewma_rtt
140
34
                .expect("ewma_rtt was not checked by can_crosscheck_with_current_estimate?!");
141

            
142
            /// Discrepancy ratio of a new RTT value that we allow against the current RTT in order
143
            /// to declare if the clock has stalled or not. This value is taken from proposal 324
144
            /// section 2.1.1 CLOCK_HEURISTICS and has the same name as in C-tor.
145
            const DELTA_DISCREPANCY_RATIO_MAX: u32 = 5000;
146
            // If we have enough data, check the sanity of our measurement against our EWMA value.
147
34
            if raw_rtt > ewma_rtt * DELTA_DISCREPANCY_RATIO_MAX {
148
                // The clock significantly jumped forward.
149
                //
150
                // Don't update the global cache, though, since this is triggerable over the
151
                // network.
152
                //
153
                // FIXME(eta): We should probably log something here?
154
                true
155
34
            } else if ewma_rtt > raw_rtt * DELTA_DISCREPANCY_RATIO_MAX {
156
                // The clock might have stalled. We can't really make a decision just off this
157
                // one measurement, though, so we'll use the stored stall value.
158
                self.clock_stalled.load(Ordering::SeqCst)
159
            } else {
160
                // If we got here, we're not stalled.
161
34
                self.clock_stalled.store(false, Ordering::SeqCst);
162
34
                false
163
            }
164
        } else {
165
            // If we don't have enough measurements to sanity check, assume it's okay.
166
140
            false
167
        }
168
174
    }
169

            
170
    /// Update the estimator on time `now` and at the congestion window `cwnd`.
171
    ///
172
    /// # Errors
173
    ///
174
    /// Each call to this function removes an entry from `sendme_expected_from` (the entries are
175
    /// added using [`sendme_expected_from`](Self::sendme_expected_from)).
176
    ///
177
    /// Returns an error if are not expecting any SENDMEs at this time (if `expect_sendme` was
178
    /// never called, or if we have exhausted all `sendme_expected_from` added by previous
179
    /// `expect_sendme` calls).
180
    ///
181
    /// Spec: prop324 section 2.1 C-tor: congestion_control_update_circuit_rtt() in
182
    /// congestion_control_common.c
183
174
    pub(crate) fn update(
184
174
        &mut self,
185
174
        now: Instant,
186
174
        state: &State,
187
174
        cwnd: &CongestionWindow,
188
174
    ) -> Result<(), Error> {
189
174
        let data_sent_at = self
190
174
            .sendme_expected_from
191
174
            .pop_front()
192
174
            .ok_or(Error::MismatchedEstimationCall)?;
193
174
        let raw_rtt = now.saturating_duration_since(data_sent_at);
194

            
195
174
        if self.is_clock_stalled(raw_rtt, state.in_slow_start()) {
196
            return Ok(());
197
174
        }
198

            
199
174
        self.max_rtt = self.max_rtt.max(Some(raw_rtt));
200
174
        self.last_rtt = Some(raw_rtt);
201

            
202
        // This is the "N" for N-EWMA.
203
174
        let ewma_n = u64::from(if state.in_slow_start() {
204
140
            self.params.ewma_ss_max()
205
        } else {
206
34
            min(
207
34
                (cwnd.update_rate(state) * (self.params.ewma_cwnd_pct().as_percent())) / 100,
208
34
                self.params.ewma_max(),
209
            )
210
        });
211
174
        let ewma_n = max(ewma_n, 2);
212

            
213
        // Get the USEC values.
214
174
        let raw_rtt_usec = raw_rtt.as_micros() as u64;
215
252
        let prev_ewma_rtt_usec = self.ewma_rtt.map(|rtt| rtt.as_micros() as u64);
216

            
217
        // This is the actual EWMA calculation.
218
        // C-tor simplifies this as follows for rounding error reasons:
219
        //
220
        // EWMA = value*2/(N+1) + EMA_prev*(N-1)/(N+1)
221
        //      = (value*2 + EWMA_prev*(N-1))/(N+1)
222
        //
223
        // Spec: prop324 section 2.1.2 (N_EWMA_SMOOTHING)
224
174
        let new_ewma_rtt_usec = match prev_ewma_rtt_usec {
225
18
            None => raw_rtt_usec,
226
156
            Some(prev_ewma_rtt_usec) => {
227
156
                ((raw_rtt_usec * 2) + ((ewma_n - 1) * prev_ewma_rtt_usec)) / (ewma_n + 1)
228
            }
229
        };
230
174
        let ewma_rtt = Duration::from_micros(new_ewma_rtt_usec);
231
174
        self.ewma_rtt = Some(ewma_rtt);
232

            
233
174
        let Some(min_rtt) = self.min_rtt else {
234
18
            self.min_rtt = self.ewma_rtt;
235
18
            return Ok(());
236
        };
237

            
238
156
        if cwnd.get() == cwnd.min() && !state.in_slow_start() {
239
4
            // The cast is OK even if lossy, we only care about the usec level.
240
4
            let max = max(ewma_rtt, min_rtt).as_micros() as u64;
241
4
            let min = min(ewma_rtt, min_rtt).as_micros() as u64;
242
4
            let rtt_reset_pct = u64::from(self.params.rtt_reset_pct().as_percent());
243
4
            let min_rtt = Duration::from_micros(
244
4
                (rtt_reset_pct * max / 100) + (100 - rtt_reset_pct) * min / 100,
245
4
            );
246
4

            
247
4
            self.min_rtt = Some(min_rtt);
248
152
        } else if self.ewma_rtt < self.min_rtt {
249
34
            self.min_rtt = self.ewma_rtt;
250
118
        }
251

            
252
156
        Ok(())
253
174
    }
254
}
255

            
256
#[cfg(test)]
257
mod test {
258
    // @@ begin test lint list maintained by maint/add_warning @@
259
    #![allow(clippy::bool_assert_comparison)]
260
    #![allow(clippy::clone_on_copy)]
261
    #![allow(clippy::dbg_macro)]
262
    #![allow(clippy::mixed_attributes_style)]
263
    #![allow(clippy::print_stderr)]
264
    #![allow(clippy::print_stdout)]
265
    #![allow(clippy::single_char_pattern)]
266
    #![allow(clippy::unwrap_used)]
267
    #![allow(clippy::unchecked_duration_subtraction)]
268
    #![allow(clippy::useless_vec)]
269
    #![allow(clippy::needless_pass_by_value)]
270
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
271

            
272
    use std::time::{Duration, Instant};
273

            
274
    use crate::congestion::test_utils::{new_cwnd, new_rtt_estimator};
275

            
276
    use super::*;
277

            
278
    #[derive(Debug)]
279
    struct RttTestSample {
280
        sent_usec_in: u64,
281
        sendme_received_usec_in: u64,
282
        cwnd_in: u32,
283
        ss_in: bool,
284
        last_rtt_usec_out: u64,
285
        ewma_rtt_usec_out: u64,
286
        min_rtt_usec_out: u64,
287
    }
288

            
289
    impl From<[u64; 7]> for RttTestSample {
290
        fn from(arr: [u64; 7]) -> Self {
291
            Self {
292
                sent_usec_in: arr[0],
293
                sendme_received_usec_in: arr[1],
294
                cwnd_in: arr[2] as u32,
295
                ss_in: arr[3] == 1,
296
                last_rtt_usec_out: arr[4],
297
                ewma_rtt_usec_out: arr[5],
298
                min_rtt_usec_out: arr[6],
299
            }
300
        }
301
    }
302
    impl RttTestSample {
303
        fn test(&self, estimator: &mut RoundtripTimeEstimator, start: Instant) {
304
            let state = if self.ss_in {
305
                State::SlowStart
306
            } else {
307
                State::Steady
308
            };
309
            let mut cwnd = new_cwnd();
310
            cwnd.set(self.cwnd_in);
311
            let sent = start + Duration::from_micros(self.sent_usec_in);
312
            let sendme_received = start + Duration::from_micros(self.sendme_received_usec_in);
313

            
314
            estimator.expect_sendme(sent);
315
            estimator
316
                .update(sendme_received, &state, &cwnd)
317
                .expect("Error on RTT update");
318
            assert_eq!(
319
                estimator.last_rtt,
320
                Some(Duration::from_micros(self.last_rtt_usec_out))
321
            );
322
            assert_eq!(
323
                estimator.ewma_rtt,
324
                Some(Duration::from_micros(self.ewma_rtt_usec_out))
325
            );
326
            assert_eq!(
327
                estimator.min_rtt,
328
                Some(Duration::from_micros(self.min_rtt_usec_out))
329
            );
330
        }
331
    }
332

            
333
    #[test]
334
    fn test_vectors() {
335
        let mut rtt = new_rtt_estimator();
336
        let now = Instant::now();
337
        // from C-tor src/test/test_congestion_control.c
338
        let vectors = [
339
            [100000, 200000, 124, 1, 100000, 100000, 100000],
340
            [200000, 300000, 124, 1, 100000, 100000, 100000],
341
            [350000, 500000, 124, 1, 150000, 133333, 100000],
342
            [500000, 550000, 124, 1, 50000, 77777, 77777],
343
            [600000, 700000, 124, 1, 100000, 92592, 77777],
344
            [700000, 750000, 124, 1, 50000, 64197, 64197],
345
            [750000, 875000, 124, 0, 125000, 104732, 104732],
346
            [875000, 900000, 124, 0, 25000, 51577, 104732],
347
            [900000, 950000, 200, 0, 50000, 50525, 50525],
348
        ];
349
        for vect in vectors {
350
            let vect = RttTestSample::from(vect);
351
            eprintln!("Testing vector: {:?}", vect);
352
            vect.test(&mut rtt, now);
353
        }
354
    }
355
}