tor_proto/congestion.rs
1//! Congestion control subsystem.
2//!
3//! This object is attached to a circuit hop (CircHop) and controls the logic for the congestion
4//! control support of the Tor Network. It also manages the circuit level SENDME logic which is
5//! part of congestion control.
6//!
7//! # Implementation
8//!
9//! The basics of this subsystem is that it is notified when a DATA cell is received or sent. This
10//! in turn updates the congestion control state so that the very important
11//! [`can_send`](CongestionControl::can_send) function be accurate to decide if a DATA cell can be
12//! sent or not.
13//!
14//! Any part of the arti code that wants to send a DATA cell on the wire needs to call
15//! [`can_send`](CongestionControl::can_send) before else we'll risk leaving the circuit in a
16//! protocol violation state.
17//!
18//! Furthermore, as we receive and emit SENDMEs, it also has entry point for those two events in
19//! order to update the state.
20
21#[cfg(any(test, feature = "testing"))]
22pub(crate) mod test_utils;
23
24mod fixed;
25pub mod params;
26mod rtt;
27pub(crate) mod sendme;
28mod vegas;
29
30use std::time::Instant;
31
32use crate::{Error, Result};
33
34use self::{
35 params::{Algorithm, CongestionControlParams, CongestionWindowParams},
36 rtt::RoundtripTimeEstimator,
37 sendme::SendmeValidator,
38};
39use tor_cell::relaycell::msg::SendmeTag;
40
41/// This trait defines what a congestion control algorithm must implement in order to interface
42/// with the circuit reactor.
43///
44/// Note that all functions informing the algorithm, as in not getters, return a Result meaning
45/// that on error, it means we can't recover or that there is a protocol violation. In both
46/// cases, the circuit MUST be closed.
47pub(crate) trait CongestionControlAlgorithm: Send + std::fmt::Debug {
48 /// Return true iff this algorithm uses stream level SENDMEs.
49 fn uses_stream_sendme(&self) -> bool;
50 /// Return true iff the next cell is expected to be a SENDME.
51 fn is_next_cell_sendme(&self) -> bool;
52 /// Return true iff a cell can be sent on the wire according to the congestion control
53 /// algorithm.
54 fn can_send(&self) -> bool;
55 /// Return the congestion window object. The reason is returns an Option is because not all
56 /// algorithm uses one and so we avoid acting on it if so.
57 fn cwnd(&self) -> Option<&CongestionWindow>;
58
59 /// Inform the algorithm that we just got a DATA cell.
60 ///
61 /// Return true if a SENDME should be sent immediately or false if not.
62 fn data_received(&mut self) -> Result<bool>;
63 /// Inform the algorithm that we just sent a DATA cell.
64 fn data_sent(&mut self) -> Result<()>;
65 /// Inform the algorithm that we've just received a SENDME.
66 ///
67 /// This is a core function because the algorithm massively update its state when receiving a
68 /// SENDME by using the RTT value and congestion signals.
69 fn sendme_received(
70 &mut self,
71 state: &mut State,
72 rtt: &mut RoundtripTimeEstimator,
73 signals: CongestionSignals,
74 ) -> Result<()>;
75 /// Inform the algorithm that we just sent a SENDME.
76 fn sendme_sent(&mut self) -> Result<()>;
77
78 /// Return the number of in-flight cells (sent but awaiting SENDME ack).
79 ///
80 /// Optional, because not all algorithms track this.
81 #[cfg(feature = "conflux")]
82 fn inflight(&self) -> Option<u32>;
83
84 /// Test Only: Return the congestion window.
85 #[cfg(test)]
86 fn send_window(&self) -> u32;
87
88 /// Return the congestion control [`Algorithm`] implemented by this type.
89 fn algorithm(&self) -> Algorithm;
90}
91
92/// These are congestion signals used by a congestion control algorithm to make decisions. These
93/// signals are various states of our internals. This is not an exhaustive list.
94#[derive(Copy, Clone)]
95pub(crate) struct CongestionSignals {
96 /// Indicate if the channel is blocked.
97 pub(crate) channel_blocked: bool,
98 /// The size of the channel outbound queue.
99 pub(crate) channel_outbound_size: u32,
100}
101
102impl CongestionSignals {
103 /// Constructor
104 pub(crate) fn new(channel_blocked: bool, channel_outbound_size: usize) -> Self {
105 Self {
106 channel_blocked,
107 channel_outbound_size: channel_outbound_size.saturating_add(0) as u32,
108 }
109 }
110}
111
112/// Congestion control state.
113#[derive(Copy, Clone, Default)]
114pub(crate) enum State {
115 /// The initial state any circuit starts in. Used to gradually increase the amount of data
116 /// being transmitted in order to converge towards to optimal capacity.
117 #[default]
118 SlowStart,
119 /// Steady state representing what we think is optimal. This is always after slow start.
120 Steady,
121}
122
123impl State {
124 /// Return true iff this is SlowStart.
125 pub(crate) fn in_slow_start(&self) -> bool {
126 matches!(self, State::SlowStart)
127 }
128}
129
130/// A congestion window. This is generic for all algorithms but their parameters' value will differ
131/// depending on the selected algorithm.
132#[derive(Clone, Debug)]
133pub(crate) struct CongestionWindow {
134 /// Congestion window parameters from the consensus.
135 params: CongestionWindowParams,
136 /// The actual value of our congestion window.
137 value: u32,
138 /// The congestion window is full.
139 is_full: bool,
140}
141
142impl CongestionWindow {
143 /// Constructor taking consensus parameters.
144 fn new(params: &CongestionWindowParams) -> Self {
145 Self {
146 value: params.cwnd_init(),
147 params: params.clone(),
148 is_full: false,
149 }
150 }
151
152 /// Decrement the window by the increment value.
153 pub(crate) fn dec(&mut self) {
154 self.value = self
155 .value
156 .saturating_sub(self.increment())
157 .max(self.params.cwnd_min());
158 }
159
160 /// Increment the window by the increment value.
161 pub(crate) fn inc(&mut self) {
162 self.value = self
163 .value
164 .saturating_add(self.increment())
165 .min(self.params.cwnd_max());
166 }
167
168 /// Return the current value.
169 pub(crate) fn get(&self) -> u32 {
170 self.value
171 }
172
173 /// Return the expected rate for which the congestion window should be updated at.
174 ///
175 /// See `CWND_UPDATE_RATE` in prop324.
176 pub(crate) fn update_rate(&self, state: &State) -> u32 {
177 if state.in_slow_start() {
178 1
179 } else {
180 (self.get() + self.increment_rate() * self.sendme_inc() / 2)
181 / (self.increment_rate() * self.sendme_inc())
182 }
183 }
184
185 /// Return minimum value of the congestion window.
186 pub(crate) fn min(&self) -> u32 {
187 self.params.cwnd_min()
188 }
189
190 /// Set the congestion window value with a new value.
191 pub(crate) fn set(&mut self, value: u32) {
192 self.value = value;
193 }
194
195 /// Return the increment value.
196 pub(crate) fn increment(&self) -> u32 {
197 self.params.cwnd_inc()
198 }
199
200 /// Return the rate at which we should increment the window.
201 pub(crate) fn increment_rate(&self) -> u32 {
202 self.params.cwnd_inc_rate()
203 }
204
205 /// Return true iff this congestion window is full.
206 pub(crate) fn is_full(&self) -> bool {
207 self.is_full
208 }
209
210 /// Reset the full flag meaning it is now not full.
211 pub(crate) fn reset_full(&mut self) {
212 self.is_full = false;
213 }
214
215 /// Return the number of expected SENDMEs per congestion window.
216 ///
217 /// Spec: prop324 SENDME_PER_CWND definition
218 pub(crate) fn sendme_per_cwnd(&self) -> u32 {
219 (self.get() + (self.sendme_inc() / 2)) / self.sendme_inc()
220 }
221
222 /// Return the RFC3742 slow start increment value.
223 ///
224 /// Spec: prop324 rfc3742_ss_inc definition
225 pub(crate) fn rfc3742_ss_inc(&mut self, ss_cap: u32) -> u32 {
226 let inc = if self.get() <= ss_cap {
227 ((self.params.cwnd_inc_pct_ss().as_percent() * self.sendme_inc()) + 50) / 100
228 } else {
229 (((self.sendme_inc() * ss_cap) + self.get()) / (self.get() * 2)).max(1)
230 };
231 self.value += inc;
232 inc
233 }
234
235 /// Evaluate the fullness of the window with the given parameters.
236 ///
237 /// Spec: prop324 see cwnd_is_full and cwnd_is_nonfull definition.
238 /// C-tor: cwnd_became_full() and cwnd_became_nonfull()
239 pub(crate) fn eval_fullness(&mut self, inflight: u32, full_gap: u32, full_minpct: u32) {
240 if (inflight + (self.sendme_inc() * full_gap)) >= self.get() {
241 self.is_full = true;
242 } else if (100 * inflight) < (full_minpct * self.get()) {
243 self.is_full = false;
244 }
245 }
246
247 /// Return the SENDME increment value.
248 pub(crate) fn sendme_inc(&self) -> u32 {
249 self.params.sendme_inc()
250 }
251
252 /// Return the congestion window params.
253 #[cfg(any(test, feature = "conflux"))]
254 pub(crate) fn params(&self) -> &CongestionWindowParams {
255 &self.params
256 }
257}
258
259/// Congestion control state of a hop on a circuit.
260///
261/// This controls the entire logic of congestion control and circuit level SENDMEs.
262pub(crate) struct CongestionControl {
263 /// Which congestion control state are we in?
264 state: State,
265 /// This is the SENDME validator as in it keeps track of the circuit tag found within an
266 /// authenticated SENDME cell. It can store the tags and validate a tag against our queue of
267 /// expected values.
268 sendme_validator: SendmeValidator<SendmeTag>,
269 /// The RTT estimator for the circuit we are attached on.
270 rtt: RoundtripTimeEstimator,
271 /// The congestion control algorithm.
272 algorithm: Box<dyn CongestionControlAlgorithm>,
273}
274
275impl CongestionControl {
276 /// Construct a new CongestionControl
277 pub(crate) fn new(params: &CongestionControlParams) -> Self {
278 let state = State::default();
279 // Use what the consensus tells us to use.
280 let algorithm: Box<dyn CongestionControlAlgorithm> = match params.alg() {
281 Algorithm::FixedWindow(p) => Box::new(fixed::FixedWindow::new(*p)),
282 Algorithm::Vegas(ref p) => {
283 let cwnd = CongestionWindow::new(params.cwnd_params());
284 Box::new(vegas::Vegas::new(*p, &state, cwnd))
285 }
286 };
287 Self {
288 algorithm,
289 rtt: RoundtripTimeEstimator::new(params.rtt_params()),
290 sendme_validator: SendmeValidator::new(),
291 state,
292 }
293 }
294
295 /// Return true iff the underlying algorithm uses stream level SENDMEs.
296 /// At the moment, only FixedWindow uses it. It has been eliminated with Vegas.
297 pub(crate) fn uses_stream_sendme(&self) -> bool {
298 self.algorithm.uses_stream_sendme()
299 }
300
301 /// Return true iff a DATA cell is allowed to be sent based on the congestion control state.
302 pub(crate) fn can_send(&self) -> bool {
303 self.algorithm.can_send()
304 }
305
306 /// Called when a SENDME cell is received.
307 ///
308 /// An error is returned if there is a protocol violation with regards to congestion control.
309 pub(crate) fn note_sendme_received(
310 &mut self,
311 tag: SendmeTag,
312 signals: CongestionSignals,
313 ) -> Result<()> {
314 // This MUST be the first thing that we do that is validate the SENDME. Any error leads to
315 // closing the circuit.
316 self.sendme_validator.validate(Some(tag))?;
317
318 // Update our RTT estimate if the algorithm yields back a congestion window. RTT
319 // measurements only make sense for a congestion window. For example, FixedWindow here
320 // doesn't use it and so no need for the RTT.
321 if let Some(cwnd) = self.algorithm.cwnd() {
322 self.rtt
323 .update(Instant::now(), &self.state, cwnd)
324 .map_err(|e| Error::CircProto(e.to_string()))?;
325 }
326
327 // Notify the algorithm that we've received a SENDME.
328 self.algorithm
329 .sendme_received(&mut self.state, &mut self.rtt, signals)
330 }
331
332 /// Called when a SENDME cell is sent.
333 pub(crate) fn note_sendme_sent(&mut self) -> Result<()> {
334 self.algorithm.sendme_sent()
335 }
336
337 /// Called when a DATA cell is received.
338 ///
339 /// Returns true iff a SENDME should be sent false otherwise. An error is returned if there is
340 /// a protocol violation with regards to flow or congestion control.
341 pub(crate) fn note_data_received(&mut self) -> Result<bool> {
342 self.algorithm.data_received()
343 }
344
345 /// Called when a DATA cell is sent.
346 ///
347 /// An error is returned if there is a protocol violation with regards to flow or congestion
348 /// control.
349 pub(crate) fn note_data_sent<U>(&mut self, tag: &U) -> Result<()>
350 where
351 U: Clone + Into<SendmeTag>,
352 {
353 // Inform the algorithm that the data was just sent. This is important to be the very first
354 // thing so the congestion window can be updated accordingly making the following calls
355 // using the latest data.
356 self.algorithm.data_sent()?;
357
358 // If next cell is a SENDME, we need to record the tag of this cell in order to validate
359 // the next SENDME when it arrives.
360 if self.algorithm.is_next_cell_sendme() {
361 self.sendme_validator.record(tag);
362 // Only keep the SENDME timestamp if the algorithm has a congestion window.
363 if self.algorithm.cwnd().is_some() {
364 self.rtt.expect_sendme(Instant::now());
365 }
366 }
367
368 Ok(())
369 }
370
371 /// Return the number of in-flight cells (sent but awaiting SENDME ack).
372 ///
373 /// Optional, because not all algorithms track this.
374 #[cfg(feature = "conflux")]
375 pub(crate) fn inflight(&self) -> Option<u32> {
376 self.algorithm.inflight()
377 }
378
379 /// Return the congestion window object.
380 ///
381 /// Optional, because not all algorithms track this.
382 #[cfg(feature = "conflux")]
383 pub(crate) fn cwnd(&self) -> Option<&CongestionWindow> {
384 self.algorithm.cwnd()
385 }
386
387 /// Return a reference to the RTT estimator.
388 ///
389 /// Used for conflux, for choosing the best circuit to send on.
390 #[cfg(feature = "conflux")]
391 pub(crate) fn rtt(&self) -> &RoundtripTimeEstimator {
392 &self.rtt
393 }
394
395 /// Return the congestion control algorithm.
396 #[cfg(feature = "conflux")]
397 pub(crate) fn algorithm(&self) -> Algorithm {
398 self.algorithm.algorithm()
399 }
400}
401
402#[cfg(test)]
403mod test {
404 // @@ begin test lint list maintained by maint/add_warning @@
405 #![allow(clippy::bool_assert_comparison)]
406 #![allow(clippy::clone_on_copy)]
407 #![allow(clippy::dbg_macro)]
408 #![allow(clippy::mixed_attributes_style)]
409 #![allow(clippy::print_stderr)]
410 #![allow(clippy::print_stdout)]
411 #![allow(clippy::single_char_pattern)]
412 #![allow(clippy::unwrap_used)]
413 #![allow(clippy::unchecked_duration_subtraction)]
414 #![allow(clippy::useless_vec)]
415 #![allow(clippy::needless_pass_by_value)]
416 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
417
418 use crate::congestion::test_utils::new_cwnd;
419
420 use super::CongestionControl;
421 use tor_cell::relaycell::msg::SendmeTag;
422
423 impl CongestionControl {
424 /// For testing: get a copy of the current send window, and the
425 /// expected incoming tags.
426 pub(crate) fn send_window_and_expected_tags(&self) -> (u32, Vec<SendmeTag>) {
427 (
428 self.algorithm.send_window(),
429 self.sendme_validator.expected_tags(),
430 )
431 }
432 }
433
434 #[test]
435 fn test_cwnd() {
436 let mut cwnd = new_cwnd();
437
438 // Validate the getters are coherent with initialization.
439 assert_eq!(cwnd.get(), cwnd.params().cwnd_init());
440 assert_eq!(cwnd.min(), cwnd.params().cwnd_min());
441 assert_eq!(cwnd.increment(), cwnd.params().cwnd_inc());
442 assert_eq!(cwnd.increment_rate(), cwnd.params().cwnd_inc_rate());
443 assert_eq!(cwnd.sendme_inc(), cwnd.params().sendme_inc());
444 assert!(!cwnd.is_full());
445
446 // Validate changes.
447 cwnd.inc();
448 assert_eq!(
449 cwnd.get(),
450 cwnd.params().cwnd_init() + cwnd.params().cwnd_inc()
451 );
452 cwnd.dec();
453 assert_eq!(cwnd.get(), cwnd.params().cwnd_init());
454 }
455}