1
//! Congestion control subsystem.
2
//!
3
//! This object is attached to a circuit hop (CircHop) and controls the logic for the congestion
4
//! control support of the Tor Network. It also manages the circuit level SENDME logic which is
5
//! part of congestion control.
6
//!
7
//! # Implementation
8
//!
9
//! The basics of this subsystem is that it is notified when a DATA cell is received or sent. This
10
//! in turn updates the congestion control state so that the very important
11
//! [`can_send`](CongestionControl::can_send) function be accurate to decide if a DATA cell can be
12
//! sent or not.
13
//!
14
//! Any part of the arti code that wants to send a DATA cell on the wire needs to call
15
//! [`can_send`](CongestionControl::can_send) before else we'll risk leaving the circuit in a
16
//! protocol violation state.
17
//!
18
//! Furthermore, as we receive and emit SENDMEs, it also has entry point for those two events in
19
//! order to update the state.
20

            
21
#[cfg(any(test, feature = "testing"))]
22
pub(crate) mod test_utils;
23

            
24
mod fixed;
25
pub mod params;
26
mod rtt;
27
pub(crate) mod sendme;
28
mod vegas;
29

            
30
use crate::{Error, Result};
31

            
32
use self::{
33
    params::{Algorithm, CongestionControlParams, CongestionWindowParams},
34
    rtt::RoundtripTimeEstimator,
35
    sendme::SendmeValidator,
36
};
37
use tor_cell::relaycell::msg::SendmeTag;
38
use tor_rtcompat::{DynTimeProvider, SleepProvider};
39

            
40
/// This trait defines what a congestion control algorithm must implement in order to interface
41
/// with the circuit reactor.
42
///
43
/// Note that all functions informing the algorithm, as in not getters, return a Result meaning
44
/// that on error, it means we can't recover or that there is a protocol violation. In both
45
/// cases, the circuit MUST be closed.
46
pub(crate) trait CongestionControlAlgorithm: Send + std::fmt::Debug {
47
    /// Return true iff this algorithm uses stream level SENDMEs.
48
    fn uses_stream_sendme(&self) -> bool;
49
    /// Return true iff this algorithm uses stream level XON/XOFFs.
50
    fn uses_xon_xoff(&self) -> bool;
51
    /// Return true iff the next cell is expected to be a SENDME.
52
    fn is_next_cell_sendme(&self) -> bool;
53
    /// Return true iff a cell can be sent on the wire according to the congestion control
54
    /// algorithm.
55
    fn can_send(&self) -> bool;
56
    /// Return the congestion window object. The reason is returns an Option is because not all
57
    /// algorithm uses one and so we avoid acting on it if so.
58
    fn cwnd(&self) -> Option<&CongestionWindow>;
59

            
60
    /// Inform the algorithm that we just got a DATA cell.
61
    ///
62
    /// Return true if a SENDME should be sent immediately or false if not.
63
    fn data_received(&mut self) -> Result<bool>;
64
    /// Inform the algorithm that we just sent a DATA cell.
65
    fn data_sent(&mut self) -> Result<()>;
66
    /// Inform the algorithm that we've just received a SENDME.
67
    ///
68
    /// This is a core function because the algorithm massively update its state when receiving a
69
    /// SENDME by using the RTT value and congestion signals.
70
    fn sendme_received(
71
        &mut self,
72
        state: &mut State,
73
        rtt: &mut RoundtripTimeEstimator,
74
        signals: CongestionSignals,
75
    ) -> Result<()>;
76
    /// Inform the algorithm that we just sent a SENDME.
77
    fn sendme_sent(&mut self) -> Result<()>;
78

            
79
    /// Return the number of in-flight cells (sent but awaiting SENDME ack).
80
    ///
81
    /// Optional, because not all algorithms track this.
82
    #[cfg(feature = "conflux")]
83
    fn inflight(&self) -> Option<u32>;
84

            
85
    /// Test Only: Return the congestion window.
86
    #[cfg(test)]
87
    fn send_window(&self) -> u32;
88

            
89
    /// Return the congestion control [`Algorithm`] implemented by this type.
90
    fn algorithm(&self) -> Algorithm;
91
}
92

            
93
/// These are congestion signals used by a congestion control algorithm to make decisions. These
94
/// signals are various states of our internals. This is not an exhaustive list.
95
#[derive(Copy, Clone)]
96
pub(crate) struct CongestionSignals {
97
    /// Indicate if the channel is blocked.
98
    pub(crate) channel_blocked: bool,
99
    /// The size of the channel outbound queue.
100
    pub(crate) channel_outbound_size: u32,
101
}
102

            
103
impl CongestionSignals {
104
    /// Constructor
105
164
    pub(crate) fn new(channel_blocked: bool, channel_outbound_size: usize) -> Self {
106
164
        Self {
107
164
            channel_blocked,
108
164
            channel_outbound_size: channel_outbound_size.saturating_add(0) as u32,
109
164
        }
110
164
    }
111
}
112

            
113
/// Congestion control state.
114
#[derive(Copy, Clone, Default)]
115
pub(crate) enum State {
116
    /// The initial state any circuit starts in. Used to gradually increase the amount of data
117
    /// being transmitted in order to converge towards to optimal capacity.
118
    #[default]
119
    SlowStart,
120
    /// Steady state representing what we think is optimal. This is always after slow start.
121
    Steady,
122
}
123

            
124
impl State {
125
    /// Return true iff this is SlowStart.
126
1136
    pub(crate) fn in_slow_start(&self) -> bool {
127
1136
        matches!(self, State::SlowStart)
128
1136
    }
129
}
130

            
131
/// A congestion window. This is generic for all algorithms but their parameters' value will differ
132
/// depending on the selected algorithm.
133
#[derive(Clone, Debug)]
134
pub(crate) struct CongestionWindow {
135
    /// Congestion window parameters from the consensus.
136
    params: CongestionWindowParams,
137
    /// The actual value of our congestion window.
138
    value: u32,
139
    /// The congestion window is full.
140
    is_full: bool,
141
}
142

            
143
impl CongestionWindow {
144
    /// Constructor taking consensus parameters.
145
300
    fn new(params: &CongestionWindowParams) -> Self {
146
300
        Self {
147
300
            value: params.cwnd_init(),
148
300
            params: params.clone(),
149
300
            is_full: false,
150
300
        }
151
300
    }
152

            
153
    /// Decrement the window by the increment value.
154
38
    pub(crate) fn dec(&mut self) {
155
38
        self.value = self
156
38
            .value
157
38
            .saturating_sub(self.increment())
158
38
            .max(self.params.cwnd_min());
159
38
    }
160

            
161
    /// Increment the window by the increment value.
162
12
    pub(crate) fn inc(&mut self) {
163
12
        self.value = self
164
12
            .value
165
12
            .saturating_add(self.increment())
166
12
            .min(self.params.cwnd_max());
167
12
    }
168

            
169
    /// Return the current value.
170
11990
    pub(crate) fn get(&self) -> u32 {
171
11990
        self.value
172
11990
    }
173

            
174
    /// Return the expected rate for which the congestion window should be updated at.
175
    ///
176
    /// See `CWND_UPDATE_RATE` in prop324.
177
498
    pub(crate) fn update_rate(&self, state: &State) -> u32 {
178
498
        if state.in_slow_start() {
179
366
            1
180
        } else {
181
132
            (self.get() + self.increment_rate() * self.sendme_inc() / 2)
182
132
                / (self.increment_rate() * self.sendme_inc())
183
        }
184
498
    }
185

            
186
    /// Return minimum value of the congestion window.
187
158
    pub(crate) fn min(&self) -> u32 {
188
158
        self.params.cwnd_min()
189
158
    }
190

            
191
    /// Set the congestion window value with a new value.
192
36
    pub(crate) fn set(&mut self, value: u32) {
193
36
        self.value = value;
194
36
    }
195

            
196
    /// Return the increment value.
197
136
    pub(crate) fn increment(&self) -> u32 {
198
136
        self.params.cwnd_inc()
199
136
    }
200

            
201
    /// Return the rate at which we should increment the window.
202
346
    pub(crate) fn increment_rate(&self) -> u32 {
203
346
        self.params.cwnd_inc_rate()
204
346
    }
205

            
206
    /// Return true iff this congestion window is full.
207
224
    pub(crate) fn is_full(&self) -> bool {
208
224
        self.is_full
209
224
    }
210

            
211
    /// Reset the full flag meaning it is now not full.
212
26
    pub(crate) fn reset_full(&mut self) {
213
26
        self.is_full = false;
214
26
    }
215

            
216
    /// Return the number of expected SENDMEs per congestion window.
217
    ///
218
    /// Spec: prop324 SENDME_PER_CWND definition
219
262
    pub(crate) fn sendme_per_cwnd(&self) -> u32 {
220
262
        (self.get() + (self.sendme_inc() / 2)) / self.sendme_inc()
221
262
    }
222

            
223
    /// Return the RFC3742 slow start increment value.
224
    ///
225
    /// Spec: prop324 rfc3742_ss_inc definition
226
80
    pub(crate) fn rfc3742_ss_inc(&mut self, ss_cap: u32) -> u32 {
227
80
        let inc = if self.get() <= ss_cap {
228
74
            ((self.params.cwnd_inc_pct_ss().as_percent() * self.sendme_inc()) + 50) / 100
229
        } else {
230
6
            (((self.sendme_inc() * ss_cap) + self.get()) / (self.get() * 2)).max(1)
231
        };
232
80
        self.value += inc;
233
80
        inc
234
80
    }
235

            
236
    /// Evaluate the fullness of the window with the given parameters.
237
    ///
238
    /// Spec: prop324 see cwnd_is_full and cwnd_is_nonfull definition.
239
    /// C-tor: cwnd_became_full() and cwnd_became_nonfull()
240
156
    pub(crate) fn eval_fullness(&mut self, inflight: u32, full_gap: u32, full_minpct: u32) {
241
156
        if (inflight + (self.sendme_inc() * full_gap)) >= self.get() {
242
102
            self.is_full = true;
243
102
        } else if (100 * inflight) < (full_minpct * self.get()) {
244
32
            self.is_full = false;
245
32
        }
246
156
    }
247

            
248
    /// Return the SENDME increment value.
249
2662
    pub(crate) fn sendme_inc(&self) -> u32 {
250
2662
        self.params.sendme_inc()
251
2662
    }
252

            
253
    /// Return the congestion window params.
254
    #[cfg(any(test, feature = "conflux"))]
255
32
    pub(crate) fn params(&self) -> &CongestionWindowParams {
256
32
        &self.params
257
32
    }
258
}
259

            
260
/// Congestion control state of a hop on a circuit.
261
///
262
/// This controls the entire logic of congestion control and circuit level SENDMEs.
263
pub(crate) struct CongestionControl {
264
    /// Which congestion control state are we in?
265
    state: State,
266
    /// This is the SENDME validator as in it keeps track of the circuit tag found within an
267
    /// authenticated SENDME cell. It can store the tags and validate a tag against our queue of
268
    /// expected values.
269
    sendme_validator: SendmeValidator<SendmeTag>,
270
    /// The RTT estimator for the circuit we are attached on.
271
    rtt: RoundtripTimeEstimator,
272
    /// The congestion control algorithm.
273
    algorithm: Box<dyn CongestionControlAlgorithm>,
274
}
275

            
276
impl CongestionControl {
277
    /// Construct a new CongestionControl
278
744
    pub(crate) fn new(params: &CongestionControlParams) -> Self {
279
744
        let state = State::default();
280
        // Use what the consensus tells us to use.
281
744
        let algorithm: Box<dyn CongestionControlAlgorithm> = match params.alg() {
282
472
            Algorithm::FixedWindow(p) => Box::new(fixed::FixedWindow::new(*p)),
283
272
            Algorithm::Vegas(ref p) => {
284
272
                let cwnd = CongestionWindow::new(params.cwnd_params());
285
272
                Box::new(vegas::Vegas::new(*p, &state, cwnd))
286
            }
287
        };
288
744
        Self {
289
744
            algorithm,
290
744
            rtt: RoundtripTimeEstimator::new(params.rtt_params()),
291
744
            sendme_validator: SendmeValidator::new(),
292
744
            state,
293
744
        }
294
744
    }
295

            
296
    /// Return true iff the underlying algorithm uses stream level SENDMEs.
297
    /// At the moment, only FixedWindow uses it. It has been eliminated with Vegas.
298
92
    pub(crate) fn uses_stream_sendme(&self) -> bool {
299
92
        self.algorithm.uses_stream_sendme()
300
92
    }
301

            
302
    /// Return true iff the underlying algorithm uses stream level XON/XOFFs.
303
    /// At the moment, only Vegas uses it.
304
168
    pub(crate) fn uses_xon_xoff(&self) -> bool {
305
168
        self.algorithm.uses_xon_xoff()
306
168
    }
307

            
308
    /// Return true iff a DATA cell is allowed to be sent based on the congestion control state.
309
20948
    pub(crate) fn can_send(&self) -> bool {
310
20948
        self.algorithm.can_send()
311
20948
    }
312

            
313
    /// Called when a SENDME cell is received.
314
    ///
315
    /// An error is returned if there is a protocol violation with regards to congestion control.
316
44
    pub(crate) fn note_sendme_received(
317
44
        &mut self,
318
44
        runtime: &DynTimeProvider,
319
44
        tag: SendmeTag,
320
44
        signals: CongestionSignals,
321
44
    ) -> Result<()> {
322
44
        // This MUST be the first thing that we do that is validate the SENDME. Any error leads to
323
44
        // closing the circuit.
324
44
        self.sendme_validator.validate(Some(tag))?;
325

            
326
40
        let now = runtime.now();
327
        // Update our RTT estimate if the algorithm yields back a congestion window. RTT
328
        // measurements only make sense for a congestion window. For example, FixedWindow here
329
        // doesn't use it and so no need for the RTT.
330
40
        if let Some(cwnd) = self.algorithm.cwnd() {
331
36
            self.rtt
332
36
                .update(now, &self.state, cwnd)
333
36
                .map_err(|e| Error::CircProto(e.to_string()))?;
334
4
        }
335

            
336
        // Notify the algorithm that we've received a SENDME.
337
40
        self.algorithm
338
40
            .sendme_received(&mut self.state, &mut self.rtt, signals)
339
44
    }
340

            
341
    /// Called when a SENDME cell is sent.
342
    pub(crate) fn note_sendme_sent(&mut self) -> Result<()> {
343
        self.algorithm.sendme_sent()
344
    }
345

            
346
    /// Called when a DATA cell is received.
347
    ///
348
    /// Returns true iff a SENDME should be sent false otherwise. An error is returned if there is
349
    /// a protocol violation with regards to flow or congestion control.
350
76
    pub(crate) fn note_data_received(&mut self) -> Result<bool> {
351
76
        self.algorithm.data_received()
352
76
    }
353

            
354
    /// Called when a DATA cell is sent.
355
    ///
356
    /// An error is returned if there is a protocol violation with regards to flow or congestion
357
    /// control.
358
4016
    pub(crate) fn note_data_sent<U>(&mut self, runtime: &DynTimeProvider, tag: &U) -> Result<()>
359
4016
    where
360
4016
        U: Clone + Into<SendmeTag>,
361
4016
    {
362
4016
        // Inform the algorithm that the data was just sent. This is important to be the very first
363
4016
        // thing so the congestion window can be updated accordingly making the following calls
364
4016
        // using the latest data.
365
4016
        self.algorithm.data_sent()?;
366

            
367
        // If next cell is a SENDME, we need to record the tag of this cell in order to validate
368
        // the next SENDME when it arrives.
369
4016
        if self.algorithm.is_next_cell_sendme() {
370
60
            self.sendme_validator.record(tag);
371
60
            // Only keep the SENDME timestamp if the algorithm has a congestion window.
372
60
            if self.algorithm.cwnd().is_some() {
373
36
                self.rtt.expect_sendme(runtime.now());
374
36
            }
375
3956
        }
376

            
377
4016
        Ok(())
378
4016
    }
379

            
380
    /// Return the number of in-flight cells (sent but awaiting SENDME ack).
381
    ///
382
    /// Optional, because not all algorithms track this.
383
    #[cfg(feature = "conflux")]
384
16
    pub(crate) fn inflight(&self) -> Option<u32> {
385
16
        self.algorithm.inflight()
386
16
    }
387

            
388
    /// Return the congestion window object.
389
    ///
390
    /// Optional, because not all algorithms track this.
391
    #[cfg(feature = "conflux")]
392
16
    pub(crate) fn cwnd(&self) -> Option<&CongestionWindow> {
393
16
        self.algorithm.cwnd()
394
16
    }
395

            
396
    /// Return a reference to the RTT estimator.
397
    ///
398
    /// Used for conflux, for choosing the best circuit to send on.
399
    #[cfg(feature = "conflux")]
400
2400
    pub(crate) fn rtt(&self) -> &RoundtripTimeEstimator {
401
2400
        &self.rtt
402
2400
    }
403

            
404
    /// Return the congestion control algorithm.
405
    #[cfg(feature = "conflux")]
406
48
    pub(crate) fn algorithm(&self) -> Algorithm {
407
48
        self.algorithm.algorithm()
408
48
    }
409
}
410

            
411
#[cfg(test)]
412
mod test {
413
    // @@ begin test lint list maintained by maint/add_warning @@
414
    #![allow(clippy::bool_assert_comparison)]
415
    #![allow(clippy::clone_on_copy)]
416
    #![allow(clippy::dbg_macro)]
417
    #![allow(clippy::mixed_attributes_style)]
418
    #![allow(clippy::print_stderr)]
419
    #![allow(clippy::print_stdout)]
420
    #![allow(clippy::single_char_pattern)]
421
    #![allow(clippy::unwrap_used)]
422
    #![allow(clippy::unchecked_duration_subtraction)]
423
    #![allow(clippy::useless_vec)]
424
    #![allow(clippy::needless_pass_by_value)]
425
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
426

            
427
    use crate::congestion::test_utils::new_cwnd;
428

            
429
    use super::CongestionControl;
430
    use tor_cell::relaycell::msg::SendmeTag;
431

            
432
    impl CongestionControl {
433
        /// For testing: get a copy of the current send window, and the
434
        /// expected incoming tags.
435
        pub(crate) fn send_window_and_expected_tags(&self) -> (u32, Vec<SendmeTag>) {
436
            (
437
                self.algorithm.send_window(),
438
                self.sendme_validator.expected_tags(),
439
            )
440
        }
441
    }
442

            
443
    #[test]
444
    fn test_cwnd() {
445
        let mut cwnd = new_cwnd();
446

            
447
        // Validate the getters are coherent with initialization.
448
        assert_eq!(cwnd.get(), cwnd.params().cwnd_init());
449
        assert_eq!(cwnd.min(), cwnd.params().cwnd_min());
450
        assert_eq!(cwnd.increment(), cwnd.params().cwnd_inc());
451
        assert_eq!(cwnd.increment_rate(), cwnd.params().cwnd_inc_rate());
452
        assert_eq!(cwnd.sendme_inc(), cwnd.params().sendme_inc());
453
        assert!(!cwnd.is_full());
454

            
455
        // Validate changes.
456
        cwnd.inc();
457
        assert_eq!(
458
            cwnd.get(),
459
            cwnd.params().cwnd_init() + cwnd.params().cwnd_inc()
460
        );
461
        cwnd.dec();
462
        assert_eq!(cwnd.get(), cwnd.params().cwnd_init());
463
    }
464
}