tor_proto/
congestion.rs

1//! Congestion control subsystem.
2//!
3//! This object is attached to a circuit hop (CircHop) and controls the logic for the congestion
4//! control support of the Tor Network. It also manages the circuit level SENDME logic which is
5//! part of congestion control.
6//!
7//! # Implementation
8//!
9//! The basics of this subsystem is that it is notified when a DATA cell is received or sent. This
10//! in turn updates the congestion control state so that the very important
11//! [`can_send`](CongestionControl::can_send) function be accurate to decide if a DATA cell can be
12//! sent or not.
13//!
14//! Any part of the arti code that wants to send a DATA cell on the wire needs to call
15//! [`can_send`](CongestionControl::can_send) before else we'll risk leaving the circuit in a
16//! protocol violation state.
17//!
18//! Furthermore, as we receive and emit SENDMEs, it also has entry point for those two events in
19//! order to update the state.
20
21#[cfg(any(test, feature = "testing"))]
22pub(crate) mod test_utils;
23
24mod fixed;
25pub mod params;
26mod rtt;
27pub(crate) mod sendme;
28mod vegas;
29
30use std::time::Instant;
31
32use crate::{Error, Result};
33
34use self::{
35    params::{Algorithm, CongestionControlParams, CongestionWindowParams},
36    rtt::RoundtripTimeEstimator,
37    sendme::SendmeValidator,
38};
39use tor_cell::relaycell::msg::SendmeTag;
40
41/// This trait defines what a congestion control algorithm must implement in order to interface
42/// with the circuit reactor.
43///
44/// Note that all functions informing the algorithm, as in not getters, return a Result meaning
45/// that on error, it means we can't recover or that there is a protocol violation. In both
46/// cases, the circuit MUST be closed.
47pub(crate) trait CongestionControlAlgorithm: Send + std::fmt::Debug {
48    /// Return true iff this algorithm uses stream level SENDMEs.
49    fn uses_stream_sendme(&self) -> bool;
50    /// Return true iff the next cell is expected to be a SENDME.
51    fn is_next_cell_sendme(&self) -> bool;
52    /// Return true iff a cell can be sent on the wire according to the congestion control
53    /// algorithm.
54    fn can_send(&self) -> bool;
55    /// Return the congestion window object. The reason is returns an Option is because not all
56    /// algorithm uses one and so we avoid acting on it if so.
57    fn cwnd(&self) -> Option<&CongestionWindow>;
58
59    /// Inform the algorithm that we just got a DATA cell.
60    ///
61    /// Return true if a SENDME should be sent immediately or false if not.
62    fn data_received(&mut self) -> Result<bool>;
63    /// Inform the algorithm that we just sent a DATA cell.
64    fn data_sent(&mut self) -> Result<()>;
65    /// Inform the algorithm that we've just received a SENDME.
66    ///
67    /// This is a core function because the algorithm massively update its state when receiving a
68    /// SENDME by using the RTT value and congestion signals.
69    fn sendme_received(
70        &mut self,
71        state: &mut State,
72        rtt: &mut RoundtripTimeEstimator,
73        signals: CongestionSignals,
74    ) -> Result<()>;
75    /// Inform the algorithm that we just sent a SENDME.
76    fn sendme_sent(&mut self) -> Result<()>;
77
78    /// Return the number of in-flight cells (sent but awaiting SENDME ack).
79    ///
80    /// Optional, because not all algorithms track this.
81    #[cfg(feature = "conflux")]
82    fn inflight(&self) -> Option<u32>;
83
84    /// Test Only: Return the congestion window.
85    #[cfg(test)]
86    fn send_window(&self) -> u32;
87
88    /// Return the congestion control [`Algorithm`] implemented by this type.
89    fn algorithm(&self) -> Algorithm;
90}
91
92/// These are congestion signals used by a congestion control algorithm to make decisions. These
93/// signals are various states of our internals. This is not an exhaustive list.
94#[derive(Copy, Clone)]
95pub(crate) struct CongestionSignals {
96    /// Indicate if the channel is blocked.
97    pub(crate) channel_blocked: bool,
98    /// The size of the channel outbound queue.
99    pub(crate) channel_outbound_size: u32,
100}
101
102impl CongestionSignals {
103    /// Constructor
104    pub(crate) fn new(channel_blocked: bool, channel_outbound_size: usize) -> Self {
105        Self {
106            channel_blocked,
107            channel_outbound_size: channel_outbound_size.saturating_add(0) as u32,
108        }
109    }
110}
111
112/// Congestion control state.
113#[derive(Copy, Clone, Default)]
114pub(crate) enum State {
115    /// The initial state any circuit starts in. Used to gradually increase the amount of data
116    /// being transmitted in order to converge towards to optimal capacity.
117    #[default]
118    SlowStart,
119    /// Steady state representing what we think is optimal. This is always after slow start.
120    Steady,
121}
122
123impl State {
124    /// Return true iff this is SlowStart.
125    pub(crate) fn in_slow_start(&self) -> bool {
126        matches!(self, State::SlowStart)
127    }
128}
129
130/// A congestion window. This is generic for all algorithms but their parameters' value will differ
131/// depending on the selected algorithm.
132#[derive(Clone, Debug)]
133pub(crate) struct CongestionWindow {
134    /// Congestion window parameters from the consensus.
135    params: CongestionWindowParams,
136    /// The actual value of our congestion window.
137    value: u32,
138    /// The congestion window is full.
139    is_full: bool,
140}
141
142impl CongestionWindow {
143    /// Constructor taking consensus parameters.
144    fn new(params: &CongestionWindowParams) -> Self {
145        Self {
146            value: params.cwnd_init(),
147            params: params.clone(),
148            is_full: false,
149        }
150    }
151
152    /// Decrement the window by the increment value.
153    pub(crate) fn dec(&mut self) {
154        self.value = self
155            .value
156            .saturating_sub(self.increment())
157            .max(self.params.cwnd_min());
158    }
159
160    /// Increment the window by the increment value.
161    pub(crate) fn inc(&mut self) {
162        self.value = self
163            .value
164            .saturating_add(self.increment())
165            .min(self.params.cwnd_max());
166    }
167
168    /// Return the current value.
169    pub(crate) fn get(&self) -> u32 {
170        self.value
171    }
172
173    /// Return the expected rate for which the congestion window should be updated at.
174    ///
175    /// See `CWND_UPDATE_RATE` in prop324.
176    pub(crate) fn update_rate(&self, state: &State) -> u32 {
177        if state.in_slow_start() {
178            1
179        } else {
180            (self.get() + self.increment_rate() * self.sendme_inc() / 2)
181                / (self.increment_rate() * self.sendme_inc())
182        }
183    }
184
185    /// Return minimum value of the congestion window.
186    pub(crate) fn min(&self) -> u32 {
187        self.params.cwnd_min()
188    }
189
190    /// Set the congestion window value with a new value.
191    pub(crate) fn set(&mut self, value: u32) {
192        self.value = value;
193    }
194
195    /// Return the increment value.
196    pub(crate) fn increment(&self) -> u32 {
197        self.params.cwnd_inc()
198    }
199
200    /// Return the rate at which we should increment the window.
201    pub(crate) fn increment_rate(&self) -> u32 {
202        self.params.cwnd_inc_rate()
203    }
204
205    /// Return true iff this congestion window is full.
206    pub(crate) fn is_full(&self) -> bool {
207        self.is_full
208    }
209
210    /// Reset the full flag meaning it is now not full.
211    pub(crate) fn reset_full(&mut self) {
212        self.is_full = false;
213    }
214
215    /// Return the number of expected SENDMEs per congestion window.
216    ///
217    /// Spec: prop324 SENDME_PER_CWND definition
218    pub(crate) fn sendme_per_cwnd(&self) -> u32 {
219        (self.get() + (self.sendme_inc() / 2)) / self.sendme_inc()
220    }
221
222    /// Return the RFC3742 slow start increment value.
223    ///
224    /// Spec: prop324 rfc3742_ss_inc definition
225    pub(crate) fn rfc3742_ss_inc(&mut self, ss_cap: u32) -> u32 {
226        let inc = if self.get() <= ss_cap {
227            ((self.params.cwnd_inc_pct_ss().as_percent() * self.sendme_inc()) + 50) / 100
228        } else {
229            (((self.sendme_inc() * ss_cap) + self.get()) / (self.get() * 2)).max(1)
230        };
231        self.value += inc;
232        inc
233    }
234
235    /// Evaluate the fullness of the window with the given parameters.
236    ///
237    /// Spec: prop324 see cwnd_is_full and cwnd_is_nonfull definition.
238    /// C-tor: cwnd_became_full() and cwnd_became_nonfull()
239    pub(crate) fn eval_fullness(&mut self, inflight: u32, full_gap: u32, full_minpct: u32) {
240        if (inflight + (self.sendme_inc() * full_gap)) >= self.get() {
241            self.is_full = true;
242        } else if (100 * inflight) < (full_minpct * self.get()) {
243            self.is_full = false;
244        }
245    }
246
247    /// Return the SENDME increment value.
248    pub(crate) fn sendme_inc(&self) -> u32 {
249        self.params.sendme_inc()
250    }
251
252    /// Return the congestion window params.
253    #[cfg(any(test, feature = "conflux"))]
254    pub(crate) fn params(&self) -> &CongestionWindowParams {
255        &self.params
256    }
257}
258
259/// Congestion control state of a hop on a circuit.
260///
261/// This controls the entire logic of congestion control and circuit level SENDMEs.
262pub(crate) struct CongestionControl {
263    /// Which congestion control state are we in?
264    state: State,
265    /// This is the SENDME validator as in it keeps track of the circuit tag found within an
266    /// authenticated SENDME cell. It can store the tags and validate a tag against our queue of
267    /// expected values.
268    sendme_validator: SendmeValidator<SendmeTag>,
269    /// The RTT estimator for the circuit we are attached on.
270    rtt: RoundtripTimeEstimator,
271    /// The congestion control algorithm.
272    algorithm: Box<dyn CongestionControlAlgorithm>,
273}
274
275impl CongestionControl {
276    /// Construct a new CongestionControl
277    pub(crate) fn new(params: &CongestionControlParams) -> Self {
278        let state = State::default();
279        // Use what the consensus tells us to use.
280        let algorithm: Box<dyn CongestionControlAlgorithm> = match params.alg() {
281            Algorithm::FixedWindow(p) => Box::new(fixed::FixedWindow::new(*p)),
282            Algorithm::Vegas(ref p) => {
283                let cwnd = CongestionWindow::new(params.cwnd_params());
284                Box::new(vegas::Vegas::new(*p, &state, cwnd))
285            }
286        };
287        Self {
288            algorithm,
289            rtt: RoundtripTimeEstimator::new(params.rtt_params()),
290            sendme_validator: SendmeValidator::new(),
291            state,
292        }
293    }
294
295    /// Return true iff the underlying algorithm uses stream level SENDMEs.
296    /// At the moment, only FixedWindow uses it. It has been eliminated with Vegas.
297    pub(crate) fn uses_stream_sendme(&self) -> bool {
298        self.algorithm.uses_stream_sendme()
299    }
300
301    /// Return true iff a DATA cell is allowed to be sent based on the congestion control state.
302    pub(crate) fn can_send(&self) -> bool {
303        self.algorithm.can_send()
304    }
305
306    /// Called when a SENDME cell is received.
307    ///
308    /// An error is returned if there is a protocol violation with regards to congestion control.
309    pub(crate) fn note_sendme_received(
310        &mut self,
311        tag: SendmeTag,
312        signals: CongestionSignals,
313    ) -> Result<()> {
314        // This MUST be the first thing that we do that is validate the SENDME. Any error leads to
315        // closing the circuit.
316        self.sendme_validator.validate(Some(tag))?;
317
318        // Update our RTT estimate if the algorithm yields back a congestion window. RTT
319        // measurements only make sense for a congestion window. For example, FixedWindow here
320        // doesn't use it and so no need for the RTT.
321        if let Some(cwnd) = self.algorithm.cwnd() {
322            self.rtt
323                .update(Instant::now(), &self.state, cwnd)
324                .map_err(|e| Error::CircProto(e.to_string()))?;
325        }
326
327        // Notify the algorithm that we've received a SENDME.
328        self.algorithm
329            .sendme_received(&mut self.state, &mut self.rtt, signals)
330    }
331
332    /// Called when a SENDME cell is sent.
333    pub(crate) fn note_sendme_sent(&mut self) -> Result<()> {
334        self.algorithm.sendme_sent()
335    }
336
337    /// Called when a DATA cell is received.
338    ///
339    /// Returns true iff a SENDME should be sent false otherwise. An error is returned if there is
340    /// a protocol violation with regards to flow or congestion control.
341    pub(crate) fn note_data_received(&mut self) -> Result<bool> {
342        self.algorithm.data_received()
343    }
344
345    /// Called when a DATA cell is sent.
346    ///
347    /// An error is returned if there is a protocol violation with regards to flow or congestion
348    /// control.
349    pub(crate) fn note_data_sent<U>(&mut self, tag: &U) -> Result<()>
350    where
351        U: Clone + Into<SendmeTag>,
352    {
353        // Inform the algorithm that the data was just sent. This is important to be the very first
354        // thing so the congestion window can be updated accordingly making the following calls
355        // using the latest data.
356        self.algorithm.data_sent()?;
357
358        // If next cell is a SENDME, we need to record the tag of this cell in order to validate
359        // the next SENDME when it arrives.
360        if self.algorithm.is_next_cell_sendme() {
361            self.sendme_validator.record(tag);
362            // Only keep the SENDME timestamp if the algorithm has a congestion window.
363            if self.algorithm.cwnd().is_some() {
364                self.rtt.expect_sendme(Instant::now());
365            }
366        }
367
368        Ok(())
369    }
370
371    /// Return the number of in-flight cells (sent but awaiting SENDME ack).
372    ///
373    /// Optional, because not all algorithms track this.
374    #[cfg(feature = "conflux")]
375    pub(crate) fn inflight(&self) -> Option<u32> {
376        self.algorithm.inflight()
377    }
378
379    /// Return the congestion window object.
380    ///
381    /// Optional, because not all algorithms track this.
382    #[cfg(feature = "conflux")]
383    pub(crate) fn cwnd(&self) -> Option<&CongestionWindow> {
384        self.algorithm.cwnd()
385    }
386
387    /// Return a reference to the RTT estimator.
388    ///
389    /// Used for conflux, for choosing the best circuit to send on.
390    #[cfg(feature = "conflux")]
391    pub(crate) fn rtt(&self) -> &RoundtripTimeEstimator {
392        &self.rtt
393    }
394
395    /// Return the congestion control algorithm.
396    #[cfg(feature = "conflux")]
397    pub(crate) fn algorithm(&self) -> Algorithm {
398        self.algorithm.algorithm()
399    }
400}
401
402#[cfg(test)]
403mod test {
404    // @@ begin test lint list maintained by maint/add_warning @@
405    #![allow(clippy::bool_assert_comparison)]
406    #![allow(clippy::clone_on_copy)]
407    #![allow(clippy::dbg_macro)]
408    #![allow(clippy::mixed_attributes_style)]
409    #![allow(clippy::print_stderr)]
410    #![allow(clippy::print_stdout)]
411    #![allow(clippy::single_char_pattern)]
412    #![allow(clippy::unwrap_used)]
413    #![allow(clippy::unchecked_duration_subtraction)]
414    #![allow(clippy::useless_vec)]
415    #![allow(clippy::needless_pass_by_value)]
416    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
417
418    use crate::congestion::test_utils::new_cwnd;
419
420    use super::CongestionControl;
421    use tor_cell::relaycell::msg::SendmeTag;
422
423    impl CongestionControl {
424        /// For testing: get a copy of the current send window, and the
425        /// expected incoming tags.
426        pub(crate) fn send_window_and_expected_tags(&self) -> (u32, Vec<SendmeTag>) {
427            (
428                self.algorithm.send_window(),
429                self.sendme_validator.expected_tags(),
430            )
431        }
432    }
433
434    #[test]
435    fn test_cwnd() {
436        let mut cwnd = new_cwnd();
437
438        // Validate the getters are coherent with initialization.
439        assert_eq!(cwnd.get(), cwnd.params().cwnd_init());
440        assert_eq!(cwnd.min(), cwnd.params().cwnd_min());
441        assert_eq!(cwnd.increment(), cwnd.params().cwnd_inc());
442        assert_eq!(cwnd.increment_rate(), cwnd.params().cwnd_inc_rate());
443        assert_eq!(cwnd.sendme_inc(), cwnd.params().sendme_inc());
444        assert!(!cwnd.is_full());
445
446        // Validate changes.
447        cwnd.inc();
448        assert_eq!(
449            cwnd.get(),
450            cwnd.params().cwnd_init() + cwnd.params().cwnd_inc()
451        );
452        cwnd.dec();
453        assert_eq!(cwnd.get(), cwnd.params().cwnd_init());
454    }
455}