tor_proto/congestion.rs
1//! Congestion control subsystem.
2//!
3//! This object is attached to a circuit hop (CircHop) and controls the logic for the congestion
4//! control support of the Tor Network. It also manages the circuit level SENDME logic which is
5//! part of congestion control.
6//!
7//! # Implementation
8//!
9//! The basics of this subsystem is that it is notified when a DATA cell is received or sent. This
10//! in turn updates the congestion control state so that the very important
11//! [`can_send`](CongestionControl::can_send) function be accurate to decide if a DATA cell can be
12//! sent or not.
13//!
14//! Any part of the arti code that wants to send a DATA cell on the wire needs to call
15//! [`can_send`](CongestionControl::can_send) before else we'll risk leaving the circuit in a
16//! protocol violation state.
17//!
18//! Furthermore, as we receive and emit SENDMEs, it also has entry point for those two events in
19//! order to update the state.
20
21#[cfg(any(test, feature = "testing"))]
22pub(crate) mod test_utils;
23
24mod fixed;
25pub mod params;
26mod rtt;
27pub(crate) mod sendme;
28mod vegas;
29
30use crate::{Error, Result};
31
32use self::{
33 params::{Algorithm, CongestionControlParams, CongestionWindowParams},
34 rtt::RoundtripTimeEstimator,
35 sendme::SendmeValidator,
36};
37use tor_cell::relaycell::msg::SendmeTag;
38use tor_rtcompat::{DynTimeProvider, SleepProvider};
39
40/// This trait defines what a congestion control algorithm must implement in order to interface
41/// with the circuit reactor.
42///
43/// Note that all functions informing the algorithm, as in not getters, return a Result meaning
44/// that on error, it means we can't recover or that there is a protocol violation. In both
45/// cases, the circuit MUST be closed.
46pub(crate) trait CongestionControlAlgorithm: Send + std::fmt::Debug {
47 /// Return true iff this algorithm uses stream level SENDMEs.
48 fn uses_stream_sendme(&self) -> bool;
49 /// Return true iff the next cell is expected to be a SENDME.
50 fn is_next_cell_sendme(&self) -> bool;
51 /// Return true iff a cell can be sent on the wire according to the congestion control
52 /// algorithm.
53 fn can_send(&self) -> bool;
54 /// Return the congestion window object. The reason is returns an Option is because not all
55 /// algorithm uses one and so we avoid acting on it if so.
56 fn cwnd(&self) -> Option<&CongestionWindow>;
57
58 /// Inform the algorithm that we just got a DATA cell.
59 ///
60 /// Return true if a SENDME should be sent immediately or false if not.
61 fn data_received(&mut self) -> Result<bool>;
62 /// Inform the algorithm that we just sent a DATA cell.
63 fn data_sent(&mut self) -> Result<()>;
64 /// Inform the algorithm that we've just received a SENDME.
65 ///
66 /// This is a core function because the algorithm massively update its state when receiving a
67 /// SENDME by using the RTT value and congestion signals.
68 fn sendme_received(
69 &mut self,
70 state: &mut State,
71 rtt: &mut RoundtripTimeEstimator,
72 signals: CongestionSignals,
73 ) -> Result<()>;
74 /// Inform the algorithm that we just sent a SENDME.
75 fn sendme_sent(&mut self) -> Result<()>;
76
77 /// Return the number of in-flight cells (sent but awaiting SENDME ack).
78 ///
79 /// Optional, because not all algorithms track this.
80 #[cfg(feature = "conflux")]
81 fn inflight(&self) -> Option<u32>;
82
83 /// Test Only: Return the congestion window.
84 #[cfg(test)]
85 fn send_window(&self) -> u32;
86
87 /// Return the congestion control [`Algorithm`] implemented by this type.
88 fn algorithm(&self) -> Algorithm;
89}
90
91/// These are congestion signals used by a congestion control algorithm to make decisions. These
92/// signals are various states of our internals. This is not an exhaustive list.
93#[derive(Copy, Clone)]
94pub(crate) struct CongestionSignals {
95 /// Indicate if the channel is blocked.
96 pub(crate) channel_blocked: bool,
97 /// The size of the channel outbound queue.
98 pub(crate) channel_outbound_size: u32,
99}
100
101impl CongestionSignals {
102 /// Constructor
103 pub(crate) fn new(channel_blocked: bool, channel_outbound_size: usize) -> Self {
104 Self {
105 channel_blocked,
106 channel_outbound_size: channel_outbound_size.saturating_add(0) as u32,
107 }
108 }
109}
110
111/// Congestion control state.
112#[derive(Copy, Clone, Default)]
113pub(crate) enum State {
114 /// The initial state any circuit starts in. Used to gradually increase the amount of data
115 /// being transmitted in order to converge towards to optimal capacity.
116 #[default]
117 SlowStart,
118 /// Steady state representing what we think is optimal. This is always after slow start.
119 Steady,
120}
121
122impl State {
123 /// Return true iff this is SlowStart.
124 pub(crate) fn in_slow_start(&self) -> bool {
125 matches!(self, State::SlowStart)
126 }
127}
128
129/// A congestion window. This is generic for all algorithms but their parameters' value will differ
130/// depending on the selected algorithm.
131#[derive(Clone, Debug)]
132pub(crate) struct CongestionWindow {
133 /// Congestion window parameters from the consensus.
134 params: CongestionWindowParams,
135 /// The actual value of our congestion window.
136 value: u32,
137 /// The congestion window is full.
138 is_full: bool,
139}
140
141impl CongestionWindow {
142 /// Constructor taking consensus parameters.
143 fn new(params: &CongestionWindowParams) -> Self {
144 Self {
145 value: params.cwnd_init(),
146 params: params.clone(),
147 is_full: false,
148 }
149 }
150
151 /// Decrement the window by the increment value.
152 pub(crate) fn dec(&mut self) {
153 self.value = self
154 .value
155 .saturating_sub(self.increment())
156 .max(self.params.cwnd_min());
157 }
158
159 /// Increment the window by the increment value.
160 pub(crate) fn inc(&mut self) {
161 self.value = self
162 .value
163 .saturating_add(self.increment())
164 .min(self.params.cwnd_max());
165 }
166
167 /// Return the current value.
168 pub(crate) fn get(&self) -> u32 {
169 self.value
170 }
171
172 /// Return the expected rate for which the congestion window should be updated at.
173 ///
174 /// See `CWND_UPDATE_RATE` in prop324.
175 pub(crate) fn update_rate(&self, state: &State) -> u32 {
176 if state.in_slow_start() {
177 1
178 } else {
179 (self.get() + self.increment_rate() * self.sendme_inc() / 2)
180 / (self.increment_rate() * self.sendme_inc())
181 }
182 }
183
184 /// Return minimum value of the congestion window.
185 pub(crate) fn min(&self) -> u32 {
186 self.params.cwnd_min()
187 }
188
189 /// Set the congestion window value with a new value.
190 pub(crate) fn set(&mut self, value: u32) {
191 self.value = value;
192 }
193
194 /// Return the increment value.
195 pub(crate) fn increment(&self) -> u32 {
196 self.params.cwnd_inc()
197 }
198
199 /// Return the rate at which we should increment the window.
200 pub(crate) fn increment_rate(&self) -> u32 {
201 self.params.cwnd_inc_rate()
202 }
203
204 /// Return true iff this congestion window is full.
205 pub(crate) fn is_full(&self) -> bool {
206 self.is_full
207 }
208
209 /// Reset the full flag meaning it is now not full.
210 pub(crate) fn reset_full(&mut self) {
211 self.is_full = false;
212 }
213
214 /// Return the number of expected SENDMEs per congestion window.
215 ///
216 /// Spec: prop324 SENDME_PER_CWND definition
217 pub(crate) fn sendme_per_cwnd(&self) -> u32 {
218 (self.get() + (self.sendme_inc() / 2)) / self.sendme_inc()
219 }
220
221 /// Return the RFC3742 slow start increment value.
222 ///
223 /// Spec: prop324 rfc3742_ss_inc definition
224 pub(crate) fn rfc3742_ss_inc(&mut self, ss_cap: u32) -> u32 {
225 let inc = if self.get() <= ss_cap {
226 ((self.params.cwnd_inc_pct_ss().as_percent() * self.sendme_inc()) + 50) / 100
227 } else {
228 (((self.sendme_inc() * ss_cap) + self.get()) / (self.get() * 2)).max(1)
229 };
230 self.value += inc;
231 inc
232 }
233
234 /// Evaluate the fullness of the window with the given parameters.
235 ///
236 /// Spec: prop324 see cwnd_is_full and cwnd_is_nonfull definition.
237 /// C-tor: cwnd_became_full() and cwnd_became_nonfull()
238 pub(crate) fn eval_fullness(&mut self, inflight: u32, full_gap: u32, full_minpct: u32) {
239 if (inflight + (self.sendme_inc() * full_gap)) >= self.get() {
240 self.is_full = true;
241 } else if (100 * inflight) < (full_minpct * self.get()) {
242 self.is_full = false;
243 }
244 }
245
246 /// Return the SENDME increment value.
247 pub(crate) fn sendme_inc(&self) -> u32 {
248 self.params.sendme_inc()
249 }
250
251 /// Return the congestion window params.
252 #[cfg(any(test, feature = "conflux"))]
253 pub(crate) fn params(&self) -> &CongestionWindowParams {
254 &self.params
255 }
256}
257
258/// Congestion control state of a hop on a circuit.
259///
260/// This controls the entire logic of congestion control and circuit level SENDMEs.
261pub(crate) struct CongestionControl {
262 /// Which congestion control state are we in?
263 state: State,
264 /// This is the SENDME validator as in it keeps track of the circuit tag found within an
265 /// authenticated SENDME cell. It can store the tags and validate a tag against our queue of
266 /// expected values.
267 sendme_validator: SendmeValidator<SendmeTag>,
268 /// The RTT estimator for the circuit we are attached on.
269 rtt: RoundtripTimeEstimator,
270 /// The congestion control algorithm.
271 algorithm: Box<dyn CongestionControlAlgorithm>,
272}
273
274impl CongestionControl {
275 /// Construct a new CongestionControl
276 pub(crate) fn new(params: &CongestionControlParams) -> Self {
277 let state = State::default();
278 // Use what the consensus tells us to use.
279 let algorithm: Box<dyn CongestionControlAlgorithm> = match params.alg() {
280 Algorithm::FixedWindow(p) => Box::new(fixed::FixedWindow::new(*p)),
281 Algorithm::Vegas(ref p) => {
282 let cwnd = CongestionWindow::new(params.cwnd_params());
283 Box::new(vegas::Vegas::new(*p, &state, cwnd))
284 }
285 };
286 Self {
287 algorithm,
288 rtt: RoundtripTimeEstimator::new(params.rtt_params()),
289 sendme_validator: SendmeValidator::new(),
290 state,
291 }
292 }
293
294 /// Return true iff the underlying algorithm uses stream level SENDMEs.
295 /// At the moment, only FixedWindow uses it. It has been eliminated with Vegas.
296 pub(crate) fn uses_stream_sendme(&self) -> bool {
297 self.algorithm.uses_stream_sendme()
298 }
299
300 /// Return true iff a DATA cell is allowed to be sent based on the congestion control state.
301 pub(crate) fn can_send(&self) -> bool {
302 self.algorithm.can_send()
303 }
304
305 /// Called when a SENDME cell is received.
306 ///
307 /// An error is returned if there is a protocol violation with regards to congestion control.
308 pub(crate) fn note_sendme_received(
309 &mut self,
310 runtime: &DynTimeProvider,
311 tag: SendmeTag,
312 signals: CongestionSignals,
313 ) -> Result<()> {
314 // This MUST be the first thing that we do that is validate the SENDME. Any error leads to
315 // closing the circuit.
316 self.sendme_validator.validate(Some(tag))?;
317
318 let now = runtime.now();
319 // Update our RTT estimate if the algorithm yields back a congestion window. RTT
320 // measurements only make sense for a congestion window. For example, FixedWindow here
321 // doesn't use it and so no need for the RTT.
322 if let Some(cwnd) = self.algorithm.cwnd() {
323 self.rtt
324 .update(now, &self.state, cwnd)
325 .map_err(|e| Error::CircProto(e.to_string()))?;
326 }
327
328 // Notify the algorithm that we've received a SENDME.
329 self.algorithm
330 .sendme_received(&mut self.state, &mut self.rtt, signals)
331 }
332
333 /// Called when a SENDME cell is sent.
334 pub(crate) fn note_sendme_sent(&mut self) -> Result<()> {
335 self.algorithm.sendme_sent()
336 }
337
338 /// Called when a DATA cell is received.
339 ///
340 /// Returns true iff a SENDME should be sent false otherwise. An error is returned if there is
341 /// a protocol violation with regards to flow or congestion control.
342 pub(crate) fn note_data_received(&mut self) -> Result<bool> {
343 self.algorithm.data_received()
344 }
345
346 /// Called when a DATA cell is sent.
347 ///
348 /// An error is returned if there is a protocol violation with regards to flow or congestion
349 /// control.
350 pub(crate) fn note_data_sent<U>(&mut self, runtime: &DynTimeProvider, tag: &U) -> Result<()>
351 where
352 U: Clone + Into<SendmeTag>,
353 {
354 // Inform the algorithm that the data was just sent. This is important to be the very first
355 // thing so the congestion window can be updated accordingly making the following calls
356 // using the latest data.
357 self.algorithm.data_sent()?;
358
359 // If next cell is a SENDME, we need to record the tag of this cell in order to validate
360 // the next SENDME when it arrives.
361 if self.algorithm.is_next_cell_sendme() {
362 self.sendme_validator.record(tag);
363 // Only keep the SENDME timestamp if the algorithm has a congestion window.
364 if self.algorithm.cwnd().is_some() {
365 self.rtt.expect_sendme(runtime.now());
366 }
367 }
368
369 Ok(())
370 }
371
372 /// Return the number of in-flight cells (sent but awaiting SENDME ack).
373 ///
374 /// Optional, because not all algorithms track this.
375 #[cfg(feature = "conflux")]
376 pub(crate) fn inflight(&self) -> Option<u32> {
377 self.algorithm.inflight()
378 }
379
380 /// Return the congestion window object.
381 ///
382 /// Optional, because not all algorithms track this.
383 #[cfg(feature = "conflux")]
384 pub(crate) fn cwnd(&self) -> Option<&CongestionWindow> {
385 self.algorithm.cwnd()
386 }
387
388 /// Return a reference to the RTT estimator.
389 ///
390 /// Used for conflux, for choosing the best circuit to send on.
391 #[cfg(feature = "conflux")]
392 pub(crate) fn rtt(&self) -> &RoundtripTimeEstimator {
393 &self.rtt
394 }
395
396 /// Return the congestion control algorithm.
397 #[cfg(feature = "conflux")]
398 pub(crate) fn algorithm(&self) -> Algorithm {
399 self.algorithm.algorithm()
400 }
401}
402
403#[cfg(test)]
404mod test {
405 // @@ begin test lint list maintained by maint/add_warning @@
406 #![allow(clippy::bool_assert_comparison)]
407 #![allow(clippy::clone_on_copy)]
408 #![allow(clippy::dbg_macro)]
409 #![allow(clippy::mixed_attributes_style)]
410 #![allow(clippy::print_stderr)]
411 #![allow(clippy::print_stdout)]
412 #![allow(clippy::single_char_pattern)]
413 #![allow(clippy::unwrap_used)]
414 #![allow(clippy::unchecked_duration_subtraction)]
415 #![allow(clippy::useless_vec)]
416 #![allow(clippy::needless_pass_by_value)]
417 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
418
419 use crate::congestion::test_utils::new_cwnd;
420
421 use super::CongestionControl;
422 use tor_cell::relaycell::msg::SendmeTag;
423
424 impl CongestionControl {
425 /// For testing: get a copy of the current send window, and the
426 /// expected incoming tags.
427 pub(crate) fn send_window_and_expected_tags(&self) -> (u32, Vec<SendmeTag>) {
428 (
429 self.algorithm.send_window(),
430 self.sendme_validator.expected_tags(),
431 )
432 }
433 }
434
435 #[test]
436 fn test_cwnd() {
437 let mut cwnd = new_cwnd();
438
439 // Validate the getters are coherent with initialization.
440 assert_eq!(cwnd.get(), cwnd.params().cwnd_init());
441 assert_eq!(cwnd.min(), cwnd.params().cwnd_min());
442 assert_eq!(cwnd.increment(), cwnd.params().cwnd_inc());
443 assert_eq!(cwnd.increment_rate(), cwnd.params().cwnd_inc_rate());
444 assert_eq!(cwnd.sendme_inc(), cwnd.params().sendme_inc());
445 assert!(!cwnd.is_full());
446
447 // Validate changes.
448 cwnd.inc();
449 assert_eq!(
450 cwnd.get(),
451 cwnd.params().cwnd_init() + cwnd.params().cwnd_inc()
452 );
453 cwnd.dec();
454 assert_eq!(cwnd.get(), cwnd.params().cwnd_init());
455 }
456}