1use super::{
8 params::{Algorithm, VegasParams},
9 rtt::RoundtripTimeEstimator,
10 CongestionControlAlgorithm, CongestionSignals, CongestionWindow, State,
11};
12use crate::Result;
13
14use tor_error::{error_report, internal};
15
16#[derive(Clone, Debug, Default)]
20pub(crate) struct BdpEstimator {
21 bdp: u32,
23}
24
25impl BdpEstimator {
26 fn get(&self) -> u32 {
28 self.bdp
29 }
30
31 fn update(
36 &mut self,
37 cwnd: &CongestionWindow,
38 rtt: &RoundtripTimeEstimator,
39 signals: &CongestionSignals,
40 ) {
41 if rtt.clock_stalled() {
43 self.bdp = if signals.channel_blocked {
44 cwnd.get()
47 .saturating_sub(signals.channel_outbound_size)
48 .max(cwnd.min())
49 } else {
50 cwnd.get()
51 };
52 } else {
53 let min_rtt_usec = rtt.min_rtt_usec().unwrap_or(u32::MAX);
59 let ewma_rtt_usec = rtt.ewma_rtt_usec().unwrap_or(u32::MAX);
60 self.bdp = cwnd
61 .get()
62 .saturating_mul(min_rtt_usec)
63 .saturating_div(ewma_rtt_usec);
64 }
65 }
66}
67
68#[derive(Clone, Debug)]
78pub(crate) struct Vegas {
79 params: VegasParams,
81 bdp: BdpEstimator,
84 cwnd: CongestionWindow,
87 num_cell_until_sendme: u32,
89 num_sendme_until_cwnd_update: u32,
92 num_sendme_per_cwnd: u32,
95 num_inflight: u32,
98 is_blocked_on_chan: bool,
102}
103
104impl Vegas {
105 pub(crate) fn new(params: VegasParams, state: &State, cwnd: CongestionWindow) -> Self {
107 Self {
108 params,
109 bdp: BdpEstimator::default(),
110 num_cell_until_sendme: cwnd.sendme_inc(),
111 num_inflight: 0,
112 num_sendme_per_cwnd: 0,
113 num_sendme_until_cwnd_update: cwnd.update_rate(state),
114 cwnd,
115 is_blocked_on_chan: false,
116 }
117 }
118}
119
120impl CongestionControlAlgorithm for Vegas {
121 fn uses_stream_sendme(&self) -> bool {
122 false
124 }
125
126 fn uses_xon_xoff(&self) -> bool {
127 true
128 }
129
130 fn is_next_cell_sendme(&self) -> bool {
131 self.num_inflight % self.cwnd.sendme_inc() == 0
134 }
135
136 fn can_send(&self) -> bool {
137 self.num_inflight < self.cwnd.get()
138 }
139
140 fn cwnd(&self) -> Option<&CongestionWindow> {
141 Some(&self.cwnd)
142 }
143
144 fn sendme_received(
155 &mut self,
156 state: &mut State,
157 rtt: &mut RoundtripTimeEstimator,
158 signals: CongestionSignals,
159 ) -> Result<()> {
160 self.num_sendme_until_cwnd_update = self.num_sendme_until_cwnd_update.saturating_sub(1);
162 self.num_sendme_per_cwnd = self.num_sendme_per_cwnd.saturating_sub(1);
164
165 self.bdp.update(&self.cwnd, rtt, &signals);
172
173 if rtt.is_ready() {
176 if signals.channel_blocked {
177 if !self.is_blocked_on_chan {
180 self.num_sendme_until_cwnd_update = 0;
181 }
182 } else {
183 if self.is_blocked_on_chan {
186 self.num_sendme_until_cwnd_update = 0;
187 }
188 }
189 }
190 self.is_blocked_on_chan = signals.channel_blocked;
191
192 if !rtt.is_ready() && !self.is_blocked_on_chan {
194 debug_assert!(self.num_inflight >= self.cwnd.sendme_inc());
198 self.num_inflight = self.num_inflight.saturating_sub(self.cwnd.sendme_inc());
199 return Ok(());
200 }
201
202 let queue_use = self.cwnd.get().saturating_sub(self.bdp.get());
205
206 self.cwnd.eval_fullness(
208 self.num_inflight,
209 self.params.cwnd_full_gap(),
210 self.params.cwnd_full_min_pct().as_percent(),
211 );
212
213 if state.in_slow_start() {
215 if queue_use < self.params.cell_in_queue_params().gamma() && !self.is_blocked_on_chan {
216 if self.cwnd.is_full() {
218 let inc = self
220 .cwnd
221 .rfc3742_ss_inc(self.params.cell_in_queue_params().ss_cwnd_cap());
222
223 if (inc * self.cwnd.sendme_per_cwnd())
226 <= (self.cwnd.increment() * self.cwnd.increment_rate())
227 {
228 *state = State::Steady;
229 }
230 }
231 } else {
232 self.cwnd
234 .set(self.bdp.get() + self.params.cell_in_queue_params().gamma());
235 *state = State::Steady;
237 }
238
239 if self.cwnd.get() >= self.params.ss_cwnd_max() {
241 self.cwnd.set(self.params.ss_cwnd_max());
242 *state = State::Steady;
243 }
244 } else if self.num_sendme_until_cwnd_update == 0 {
245 if queue_use > self.params.cell_in_queue_params().delta() {
247 self.cwnd.set(
249 self.bdp.get() + self.params.cell_in_queue_params().delta()
250 - self.cwnd.increment(),
251 );
252 } else if queue_use > self.params.cell_in_queue_params().beta()
253 || self.is_blocked_on_chan
254 {
255 self.cwnd.dec();
257 } else if self.cwnd.is_full() && queue_use < self.params.cell_in_queue_params().alpha()
258 {
259 self.cwnd.inc();
261 }
262 }
263
264 if self.num_sendme_until_cwnd_update == 0 {
266 self.num_sendme_until_cwnd_update = self.cwnd.update_rate(state);
267 }
268 if self.num_sendme_per_cwnd == 0 {
269 self.num_sendme_per_cwnd = self.cwnd.sendme_per_cwnd();
270 }
271
272 if self.params.cwnd_full_per_cwnd() != 0 {
274 if self.num_sendme_per_cwnd == self.cwnd.sendme_per_cwnd() {
275 self.cwnd.reset_full();
276 }
277 } else if self.num_sendme_until_cwnd_update == self.cwnd.update_rate(state) {
278 self.cwnd.reset_full();
279 }
280
281 self.num_inflight = self.num_inflight.saturating_sub(self.cwnd.sendme_inc());
283 Ok(())
284 }
285
286 fn sendme_sent(&mut self) -> Result<()> {
287 self.num_cell_until_sendme = self.cwnd.sendme_inc();
289 Ok(())
290 }
291
292 fn data_received(&mut self) -> Result<bool> {
293 if self.num_cell_until_sendme == 0 {
294 error_report!(internal!("Congestion control unexptected data cell"), "");
298 return Ok(false);
299 }
300
301 self.num_cell_until_sendme = self.num_cell_until_sendme.saturating_sub(1);
303
304 Ok(self.num_cell_until_sendme == 0)
307 }
308
309 fn data_sent(&mut self) -> Result<()> {
310 self.num_inflight = self.num_inflight.saturating_add(1);
312 Ok(())
313 }
314
315 #[cfg(feature = "conflux")]
316 fn inflight(&self) -> Option<u32> {
317 Some(self.num_inflight)
318 }
319
320 #[cfg(test)]
321 fn send_window(&self) -> u32 {
322 self.cwnd.get()
323 }
324
325 fn algorithm(&self) -> Algorithm {
326 Algorithm::Vegas(self.params)
327 }
328}
329
330#[cfg(test)]
331#[allow(clippy::print_stderr)]
332pub(crate) mod test {
333 #![allow(clippy::bool_assert_comparison)]
335 #![allow(clippy::clone_on_copy)]
336 #![allow(clippy::dbg_macro)]
337 #![allow(clippy::mixed_attributes_style)]
338 #![allow(clippy::print_stderr)]
339 #![allow(clippy::print_stdout)]
340 #![allow(clippy::single_char_pattern)]
341 #![allow(clippy::unwrap_used)]
342 #![allow(clippy::unchecked_duration_subtraction)]
343 #![allow(clippy::useless_vec)]
344 #![allow(clippy::needless_pass_by_value)]
345 use std::{
348 collections::VecDeque,
349 time::{Duration, Instant},
350 };
351 use tor_units::Percentage;
352
353 use super::*;
354 use crate::congestion::{
355 params::VegasParamsBuilder,
356 test_utils::{new_cwnd, new_rtt_estimator},
357 };
358
359 impl Vegas {
360 pub(crate) fn set_inflight(&mut self, v: u32) {
362 self.num_inflight = v;
363 }
364 fn is_blocked_on_chan(&self) -> bool {
366 self.is_blocked_on_chan
367 }
368 fn set_is_blocked_on_chan(&mut self, v: bool) {
370 self.is_blocked_on_chan = v;
371 }
372 }
373
374 #[derive(Debug)]
377 struct TestVectorParams {
378 sent_usec_in: u64,
380 got_sendme_usec_in: u64,
381 or_conn_blocked_in: bool,
382 inflight_in: u32,
383 ewma_rtt_usec_out: u32,
385 min_rtt_usec_out: u32,
386 cwnd_out: u32,
387 in_slow_start_out: bool,
388 cwnd_full_out: bool,
389 blocked_chan_out: bool,
390 }
391
392 impl From<[u32; 10]> for TestVectorParams {
393 fn from(arr: [u32; 10]) -> Self {
394 Self {
395 sent_usec_in: u64::from(arr[0]),
396 got_sendme_usec_in: u64::from(arr[1]),
397 or_conn_blocked_in: arr[2] == 1,
398 inflight_in: arr[3],
399 ewma_rtt_usec_out: arr[4],
400 min_rtt_usec_out: arr[5],
401 cwnd_out: arr[6],
402 in_slow_start_out: arr[7] == 1,
403 cwnd_full_out: arr[8] == 1,
404 blocked_chan_out: arr[9] == 1,
405 }
406 }
407 }
408
409 struct VegasTest {
410 params: VecDeque<TestVectorParams>,
411 rtt: RoundtripTimeEstimator,
412 state: State,
413 vegas: Vegas,
414 }
415
416 impl VegasTest {
417 fn new(vec: Vec<[u32; 10]>) -> Self {
418 let mut params = VecDeque::new();
419 for values in vec {
420 params.push_back(values.into());
421 }
422 let state = State::default();
423 Self {
424 params,
425 rtt: new_rtt_estimator(),
426 vegas: Vegas::new(build_vegas_params(), &state, new_cwnd()),
427 state,
428 }
429 }
430
431 fn run_once(&mut self, p: &TestVectorParams) {
432 eprintln!("Testing vector: {:?}", p);
433 self.vegas.set_inflight(p.inflight_in);
435 self.vegas.set_is_blocked_on_chan(p.or_conn_blocked_in);
436
437 let now = Instant::now();
438 self.rtt
439 .expect_sendme(now + Duration::from_micros(p.sent_usec_in));
440 let ret = self.rtt.update(
441 now + Duration::from_micros(p.got_sendme_usec_in),
442 &self.state,
443 self.vegas.cwnd().expect("No CWND"),
444 );
445 assert!(ret.is_ok());
446
447 let signals = CongestionSignals::new(p.or_conn_blocked_in, 0);
448 let ret = self
449 .vegas
450 .sendme_received(&mut self.state, &mut self.rtt, signals);
451 assert!(ret.is_ok());
452
453 assert_eq!(self.rtt.ewma_rtt_usec().unwrap(), p.ewma_rtt_usec_out);
454 assert_eq!(self.rtt.min_rtt_usec().unwrap(), p.min_rtt_usec_out);
455 assert_eq!(self.vegas.cwnd().expect("No CWND").get(), p.cwnd_out);
456 assert_eq!(
457 self.vegas.cwnd().expect("No CWND").is_full(),
458 p.cwnd_full_out
459 );
460 assert_eq!(self.state.in_slow_start(), p.in_slow_start_out);
461 assert_eq!(self.vegas.is_blocked_on_chan(), p.blocked_chan_out);
462 }
463
464 fn run(&mut self) {
465 while let Some(param) = self.params.pop_front() {
466 self.run_once(¶m);
467 }
468 }
469 }
470
471 pub(crate) fn build_vegas_params() -> VegasParams {
472 const OUTBUF_CELLS: u32 = 62;
473 VegasParamsBuilder::default()
474 .cell_in_queue_params(
475 (
476 3 * OUTBUF_CELLS, 4 * OUTBUF_CELLS, 5 * OUTBUF_CELLS, 3 * OUTBUF_CELLS, 600, )
482 .into(),
483 )
484 .ss_cwnd_max(5_000)
485 .cwnd_full_gap(4)
486 .cwnd_full_min_pct(Percentage::new(25))
487 .cwnd_full_per_cwnd(1)
488 .build()
489 .expect("Unable to build Vegas parameters")
490 }
491
492 #[test]
493 fn test_vectors() {
494 let vec1 = vec![
495 [100000, 200000, 0, 124, 100000, 100000, 155, 1, 0, 0],
496 [200000, 300000, 0, 155, 100000, 100000, 186, 1, 1, 0],
497 [350000, 500000, 0, 186, 133333, 100000, 217, 1, 1, 0],
498 [500000, 550000, 0, 217, 77777, 77777, 248, 1, 1, 0],
499 [600000, 700000, 0, 248, 92592, 77777, 279, 1, 1, 0],
500 [700000, 750000, 0, 279, 64197, 64197, 310, 1, 0, 0], [750000, 875000, 0, 310, 104732, 64197, 341, 1, 1, 0],
502 [875000, 900000, 0, 341, 51577, 51577, 372, 1, 1, 0],
503 [900000, 950000, 0, 279, 50525, 50525, 403, 1, 1, 0],
504 [950000, 1000000, 0, 279, 50175, 50175, 434, 1, 1, 0],
505 [1000000, 1050000, 0, 279, 50058, 50058, 465, 1, 1, 0],
506 [1050000, 1100000, 0, 279, 50019, 50019, 496, 1, 1, 0],
507 [1100000, 1150000, 0, 279, 50006, 50006, 527, 1, 1, 0],
508 [1150000, 1200000, 0, 279, 50002, 50002, 558, 1, 1, 0],
509 [1200000, 1250000, 0, 550, 50000, 50000, 589, 1, 1, 0],
510 [1250000, 1300000, 0, 550, 50000, 50000, 620, 1, 0, 0], [1300000, 1350000, 0, 550, 50000, 50000, 635, 1, 1, 0],
512 [1350000, 1400000, 0, 550, 50000, 50000, 650, 1, 1, 0],
513 [1400000, 1450000, 0, 150, 50000, 50000, 650, 1, 0, 0], [1450000, 1500000, 0, 150, 50000, 50000, 650, 1, 0, 0], [1500000, 1550000, 0, 550, 50000, 50000, 664, 1, 1, 0], [1500000, 1600000, 0, 550, 83333, 50000, 584, 0, 1, 0], [1600000, 1650000, 0, 550, 61111, 50000, 585, 0, 1, 0], [1650000, 1700000, 0, 550, 53703, 50000, 586, 0, 1, 0],
519 [1700000, 1750000, 0, 100, 51234, 50000, 586, 0, 0, 0], [1750000, 1900000, 0, 100, 117078, 50000, 559, 0, 0, 0], [1900000, 2000000, 0, 100, 105692, 50000, 558, 0, 0, 0], [2000000, 2075000, 0, 500, 85230, 50000, 558, 0, 1, 0], [2075000, 2125000, 1, 500, 61743, 50000, 557, 0, 1, 1], [2125000, 2150000, 0, 500, 37247, 37247, 558, 0, 1, 0], [2150000, 2350000, 0, 500, 145749, 37247, 451, 0, 1, 0], ];
527 VegasTest::new(vec1).run();
528
529 let vec2 = vec![
530 [100000, 200000, 0, 124, 100000, 100000, 155, 1, 0, 0],
531 [200000, 300000, 0, 155, 100000, 100000, 186, 1, 1, 0],
532 [350000, 500000, 0, 186, 133333, 100000, 217, 1, 1, 0],
533 [500000, 550000, 1, 217, 77777, 77777, 403, 0, 1, 1], [600000, 700000, 0, 248, 92592, 77777, 404, 0, 1, 0], [700000, 750000, 1, 404, 64197, 64197, 403, 0, 0, 1], [750000, 875000, 0, 403, 104732, 64197, 404, 0, 1, 0],
537 ];
538 VegasTest::new(vec2).run();
539
540 let vec3 = vec![
541 [18258527, 19002938, 0, 83, 744411, 744411, 155, 1, 0, 0],
542 [18258580, 19254257, 0, 52, 911921, 744411, 186, 1, 1, 0],
543 [20003224, 20645298, 0, 164, 732023, 732023, 217, 1, 1, 0],
544 [20003367, 21021444, 0, 133, 922725, 732023, 248, 1, 1, 0],
545 [20003845, 21265508, 0, 102, 1148683, 732023, 279, 1, 1, 0],
546 [20003975, 21429157, 0, 71, 1333015, 732023, 310, 1, 0, 0],
547 [20004309, 21707677, 0, 40, 1579917, 732023, 310, 1, 0, 0],
548 ];
549 VegasTest::new(vec3).run();
550
551 let vec4 = vec![
552 [358297091, 358854163, 0, 83, 557072, 557072, 155, 1, 0, 0],
553 [358297649, 359123845, 0, 52, 736488, 557072, 186, 1, 1, 0],
554 [359492879, 359995330, 0, 186, 580463, 557072, 217, 1, 1, 0],
555 [359493043, 360489243, 0, 217, 857621, 557072, 248, 1, 1, 0],
556 [359493232, 360489673, 0, 248, 950167, 557072, 279, 1, 1, 0],
557 [359493795, 360489971, 0, 279, 980839, 557072, 310, 1, 0, 0],
558 [359493918, 360490248, 0, 310, 991166, 557072, 341, 1, 1, 0],
559 [359494029, 360716465, 0, 341, 1145346, 557072, 372, 1, 1, 0],
560 [359996888, 360948867, 0, 372, 1016434, 557072, 403, 1, 1, 0],
561 [359996979, 360949330, 0, 403, 973712, 557072, 434, 1, 1, 0],
562 [360489528, 361113615, 0, 434, 740628, 557072, 465, 1, 1, 0],
563 [360489656, 361281604, 0, 465, 774841, 557072, 496, 1, 1, 0],
564 [360489837, 361500461, 0, 496, 932029, 557072, 482, 0, 1, 0],
565 [360489963, 361500631, 0, 482, 984455, 557072, 482, 0, 1, 0],
566 [360490117, 361842481, 0, 482, 1229727, 557072, 481, 0, 1, 0],
567 ];
568 VegasTest::new(vec4).run();
569 }
570}