Tor  0.4.8.0-alpha-dev
congestion_control_flow.c
Go to the documentation of this file.
1 /* Copyright (c) 2019-2021, The Tor Project, Inc. */
2 /* See LICENSE for licensing information */
3 
4 /**
5  * \file congestion_control_flow.c
6  * \brief Code that implements flow control for congestion controlled
7  * circuits.
8  */
9 
10 #define TOR_CONGESTION_CONTROL_FLOW_PRIVATE
11 
12 #include "core/or/or.h"
13 
14 #include "core/or/relay.h"
17 #include "core/mainloop/mainloop.h"
21 #include "core/or/circuitlist.h"
22 #include "core/or/trace_probes_cc.h"
24 #include "trunnel/flow_control_cells.h"
26 #include "lib/math/stats.h"
27 
28 #include "core/or/connection_st.h"
29 #include "core/or/cell_st.h"
30 #include "app/config/config.h"
31 
32 /** Cache consensus parameters */
33 static uint32_t xoff_client;
34 static uint32_t xoff_exit;
35 
36 static uint32_t xon_change_pct;
37 static uint32_t xon_ewma_cnt;
38 static uint32_t xon_rate_bytes;
39 
40 /** Metricsport stats */
42 uint64_t cc_stats_flow_num_xon_sent;
43 double cc_stats_flow_xoff_outbuf_ma = 0;
44 double cc_stats_flow_xon_outbuf_ma = 0;
45 
46 /* In normal operation, we can get a burst of up to 32 cells before returning
47  * to libevent to flush the outbuf. This is a heuristic from hardcoded values
48  * and strange logic in connection_bucket_get_share(). */
49 #define MAX_EXPECTED_CELL_BURST 32
50 
51 /* The following three are for dropmark rate limiting. They define when we
52  * scale down our XON, XOFF, and xmit byte counts. Early scaling is beneficial
53  * because it limits the ability of spurious XON/XOFF to be sent after large
54  * amounts of data without XON/XOFF. At these limits, after 10MB of data (or
55  * more), an adversary can only inject (log2(10MB)-log2(200*500))*100 ~= 1000
56  * cells of fake XOFF/XON before the xmit byte count will be halved enough to
57  * triggering a limit. */
58 #define XON_COUNT_SCALE_AT 200
59 #define XOFF_COUNT_SCALE_AT 200
60 #define ONE_MEGABYTE (UINT64_C(1) << 20)
61 #define TOTAL_XMIT_SCALE_AT (10 * ONE_MEGABYTE)
62 
63 /**
64  * Return the congestion control object of the given edge connection.
65  *
66  * Returns NULL if the edge connection doesn't have a cpath_layer or not
67  * attached to a circuit. But also if the cpath_layer or circuit doesn't have a
68  * congestion control object.
69  */
70 static inline const congestion_control_t *
72 {
73  congestion_control_t *ccontrol = NULL;
74 
75  if (edge->on_circuit && edge->on_circuit->ccontrol) {
76  ccontrol = edge->on_circuit->ccontrol;
77  } else if (edge->cpath_layer && edge->cpath_layer->ccontrol) {
78  ccontrol = edge->cpath_layer->ccontrol;
79  }
80 
81  return ccontrol;
82 }
83 
84 /**
85  * Update global congestion control related consensus parameter values, every
86  * consensus update.
87  *
88  * More details for each of the parameters can be found in proposal 324,
89  * section 6.5 including tuning notes.
90  */
91 void
93 {
94 #define CC_XOFF_CLIENT_DFLT 500
95 #define CC_XOFF_CLIENT_MIN 1
96 #define CC_XOFF_CLIENT_MAX 10000
97  xoff_client = networkstatus_get_param(ns, "cc_xoff_client",
98  CC_XOFF_CLIENT_DFLT,
99  CC_XOFF_CLIENT_MIN,
100  CC_XOFF_CLIENT_MAX)*RELAY_PAYLOAD_SIZE;
101 
102 #define CC_XOFF_EXIT_DFLT 500
103 #define CC_XOFF_EXIT_MIN 1
104 #define CC_XOFF_EXIT_MAX 10000
105  xoff_exit = networkstatus_get_param(ns, "cc_xoff_exit",
106  CC_XOFF_EXIT_DFLT,
107  CC_XOFF_EXIT_MIN,
108  CC_XOFF_EXIT_MAX)*RELAY_PAYLOAD_SIZE;
109 
110 #define CC_XON_CHANGE_PCT_DFLT 25
111 #define CC_XON_CHANGE_PCT_MIN 1
112 #define CC_XON_CHANGE_PCT_MAX 99
113  xon_change_pct = networkstatus_get_param(ns, "cc_xon_change_pct",
114  CC_XON_CHANGE_PCT_DFLT,
115  CC_XON_CHANGE_PCT_MIN,
116  CC_XON_CHANGE_PCT_MAX);
117 
118 #define CC_XON_RATE_BYTES_DFLT (500)
119 #define CC_XON_RATE_BYTES_MIN (1)
120 #define CC_XON_RATE_BYTES_MAX (5000)
121  xon_rate_bytes = networkstatus_get_param(ns, "cc_xon_rate",
122  CC_XON_RATE_BYTES_DFLT,
123  CC_XON_RATE_BYTES_MIN,
124  CC_XON_RATE_BYTES_MAX)*RELAY_PAYLOAD_SIZE;
125 
126 #define CC_XON_EWMA_CNT_DFLT (2)
127 #define CC_XON_EWMA_CNT_MIN (2)
128 #define CC_XON_EWMA_CNT_MAX (100)
129  xon_ewma_cnt = networkstatus_get_param(ns, "cc_xon_ewma_cnt",
130  CC_XON_EWMA_CNT_DFLT,
131  CC_XON_EWMA_CNT_MIN,
132  CC_XON_EWMA_CNT_MAX);
133 }
134 
135 /**
136  * Send an XOFF for this stream, and note that we sent one
137  */
138 static void
140 {
141  xoff_cell_t xoff;
142  uint8_t payload[CELL_PAYLOAD_SIZE];
143  ssize_t xoff_size;
144 
145  memset(&xoff, 0, sizeof(xoff));
146  memset(payload, 0, sizeof(payload));
147 
148  xoff_cell_set_version(&xoff, 0);
149 
150  if ((xoff_size = xoff_cell_encode(payload, CELL_PAYLOAD_SIZE, &xoff)) < 0) {
151  log_warn(LD_BUG, "Failed to encode xon cell");
152  return;
153  }
154 
155  if (connection_edge_send_command(stream, RELAY_COMMAND_XOFF,
156  (char*)payload, (size_t)xoff_size) == 0) {
157  stream->xoff_sent = true;
159 
160  /* If this is an entry conn, notify control port */
161  if (TO_CONN(stream)->type == CONN_TYPE_AP) {
163  STREAM_EVENT_XOFF_SENT,
164  0);
165  }
166  }
167 }
168 
169 /**
170  * Compute the recent drain rate (write rate) for this edge
171  * connection and return it, in KB/sec (1000 bytes/sec).
172  *
173  * Returns 0 if the monotime clock is busted.
174  */
175 static inline uint32_t
177 {
178  if (BUG(!is_monotime_clock_reliable())) {
179  log_warn(LD_BUG, "Computing drain rate with stalled monotime clock");
180  return 0;
181  }
182 
183  uint64_t delta = monotime_absolute_usec() - stream->drain_start_usec;
184 
185  if (delta == 0) {
186  log_warn(LD_BUG, "Computing stream drain rate with zero time delta");
187  return 0;
188  }
189 
190  /* Overflow checks */
191  if (stream->prev_drained_bytes > INT32_MAX/1000 || /* Intermediate */
192  stream->prev_drained_bytes/delta > INT32_MAX/1000) { /* full value */
193  return INT32_MAX;
194  }
195 
196  /* kb/sec = bytes/usec * 1000 usec/msec * 1000 msec/sec * kb/1000bytes */
197  return MAX(1, (uint32_t)(stream->prev_drained_bytes * 1000)/delta);
198 }
199 
200 /**
201  * Send an XON for this stream, with appropriate advisory rate information.
202  *
203  * Reverts the xoff sent status, and stores the rate information we sent,
204  * in case it changes.
205  */
206 static void
208 {
209  xon_cell_t xon;
210  uint8_t payload[CELL_PAYLOAD_SIZE];
211  ssize_t xon_size;
212 
213  memset(&xon, 0, sizeof(xon));
214  memset(payload, 0, sizeof(payload));
215 
216  xon_cell_set_version(&xon, 0);
217  xon_cell_set_kbps_ewma(&xon, stream->ewma_drain_rate);
218 
219  if ((xon_size = xon_cell_encode(payload, CELL_PAYLOAD_SIZE, &xon)) < 0) {
220  log_warn(LD_BUG, "Failed to encode xon cell");
221  return;
222  }
223 
224  /* Store the advisory rate information, to send advisory updates if
225  * it changes */
226  stream->ewma_rate_last_sent = stream->ewma_drain_rate;
227 
228  if (connection_edge_send_command(stream, RELAY_COMMAND_XON, (char*)payload,
229  (size_t)xon_size) == 0) {
230  /* Revert the xoff sent status, so we can send another one if need be */
231  stream->xoff_sent = false;
232 
233  cc_stats_flow_num_xon_sent++;
234 
235  /* If it's an entry conn, notify control port */
236  if (TO_CONN(stream)->type == CONN_TYPE_AP) {
238  STREAM_EVENT_XON_SENT,
239  0);
240  }
241  }
242 }
243 
244 /**
245  * Process a stream XOFF, parsing it, and then stopping reading on
246  * the edge connection.
247  *
248  * Record that we have received an xoff, so we know not to resume
249  * reading on this edge conn until we get an XON.
250  *
251  * Returns false if the XOFF did not validate; true if it does.
252  */
253 bool
255  const crypt_path_t *layer_hint,
256  const cell_t *cell)
257 {
258  (void)cell;
259  bool retval = true;
260 
261  if (BUG(!conn)) {
262  log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
263  "Got XOFF on invalid stream?");
264  return false;
265  }
266 
267  /* Make sure this XOFF came from the right hop */
268  if (layer_hint && layer_hint != conn->cpath_layer) {
269  log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
270  "Got XOFF from wrong hop.");
271  return false;
272  }
273 
274  if (edge_get_ccontrol(conn) == NULL) {
275  log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
276  "Got XOFF for non-congestion control circuit");
277  return false;
278  }
279 
280  if (conn->xoff_received) {
281  log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
282  "Got multiple XOFF on connection");
283  return false;
284  }
285 
286  /* If we are near the max, scale everything down */
287  if (conn->num_xoff_recv == XOFF_COUNT_SCALE_AT) {
288  log_info(LD_EDGE, "Scaling down for XOFF count: %d %d %d",
289  conn->total_bytes_xmit,
290  conn->num_xoff_recv,
291  conn->num_xon_recv);
292  conn->total_bytes_xmit /= 2;
293  conn->num_xoff_recv /= 2;
294  conn->num_xon_recv /= 2;
295  }
296 
297  conn->num_xoff_recv++;
298 
299  /* Client-side check to make sure that XOFF is not sent too early,
300  * for dropmark attacks. The main sidechannel risk is early cells,
301  * but we also check to make sure that we have not received more XOFFs
302  * than could have been generated by the bytes we sent.
303  */
304  if (TO_CONN(conn)->type == CONN_TYPE_AP || conn->hs_ident != NULL) {
305  uint32_t limit = 0;
306  if (conn->hs_ident)
307  limit = xoff_client;
308  else
309  limit = xoff_exit;
310 
311  if (conn->total_bytes_xmit < limit*conn->num_xoff_recv) {
312  log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
313  "Got extra XOFF for bytes sent. Got %d, expected max %d",
314  conn->num_xoff_recv, conn->total_bytes_xmit/limit);
315  /* We still process this, because the only dropmark defenses
316  * in C tor are via the vanguards addon's use of the read valid
317  * cells. So just signal that we think this is not valid protocol
318  * data and proceed. */
319  retval = false;
320  }
321  }
322 
323  log_info(LD_EDGE, "Got XOFF!");
325  conn->xoff_received = true;
326 
327  /* If this is an entry conn, notify control port */
328  if (TO_CONN(conn)->type == CONN_TYPE_AP) {
330  STREAM_EVENT_XOFF_RECV,
331  0);
332  }
333 
334  return retval;
335 }
336 
337 /**
338  * Process a stream XON, and if it validates, clear the xoff
339  * flag and resume reading on this edge connection.
340  *
341  * Also, use provided rate information to rate limit
342  * reading on this edge (or packagaing from it onto
343  * the circuit), to avoid XON/XOFF chatter.
344  *
345  * Returns true if the XON validates, false otherwise.
346  */
347 bool
349  const crypt_path_t *layer_hint,
350  const cell_t *cell)
351 {
352  xon_cell_t *xon;
353  bool retval = true;
354 
355  if (BUG(!conn)) {
356  log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
357  "Got XON on invalid stream?");
358  return false;
359  }
360 
361  /* Make sure this XON came from the right hop */
362  if (layer_hint && layer_hint != conn->cpath_layer) {
363  log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
364  "Got XON from wrong hop.");
365  return false;
366  }
367 
368  if (edge_get_ccontrol(conn) == NULL) {
369  log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
370  "Got XON for non-congestion control circuit");
371  return false;
372  }
373 
374  if (xon_cell_parse(&xon, cell->payload+RELAY_HEADER_SIZE,
376  log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
377  "Received malformed XON cell.");
378  return false;
379  }
380 
381  /* If we are near the max, scale everything down */
382  if (conn->num_xon_recv == XON_COUNT_SCALE_AT) {
383  log_info(LD_EDGE, "Scaling down for XON count: %d %d %d",
384  conn->total_bytes_xmit,
385  conn->num_xoff_recv,
386  conn->num_xon_recv);
387  conn->total_bytes_xmit /= 2;
388  conn->num_xoff_recv /= 2;
389  conn->num_xon_recv /= 2;
390  }
391 
392  conn->num_xon_recv++;
393 
394  /* Client-side check to make sure that XON is not sent too early,
395  * for dropmark attacks. The main sidechannel risk is early cells,
396  * but we also check to see that we did not get more XONs than make
397  * sense for the number of bytes we sent.
398  */
399  if (TO_CONN(conn)->type == CONN_TYPE_AP || conn->hs_ident != NULL) {
400  uint32_t limit = 0;
401 
402  if (conn->hs_ident)
403  limit = MIN(xoff_client, xon_rate_bytes);
404  else
405  limit = MIN(xoff_exit, xon_rate_bytes);
406 
407  if (conn->total_bytes_xmit < limit*conn->num_xon_recv) {
408  log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
409  "Got extra XON for bytes sent. Got %d, expected max %d",
410  conn->num_xon_recv, conn->total_bytes_xmit/limit);
411 
412  /* We still process this, because the only dropmark defenses
413  * in C tor are via the vanguards addon's use of the read valid
414  * cells. So just signal that we think this is not valid protocol
415  * data and proceed. */
416  retval = false;
417  }
418  }
419 
420  log_info(LD_EDGE, "Got XON: %d", xon->kbps_ewma);
421 
422  /* Adjust the token bucket of this edge connection with the drain rate in
423  * the XON. Rate is in bytes from kilobit (kpbs). */
424  uint64_t rate = ((uint64_t) xon_cell_get_kbps_ewma(xon) * 1000);
425  if (rate == 0 || INT32_MAX < rate) {
426  /* No rate. */
427  rate = INT32_MAX;
428  }
429  token_bucket_rw_adjust(&conn->bucket, (uint32_t) rate, (uint32_t) rate);
430 
431  if (conn->xoff_received) {
432  /* Clear the fact that we got an XOFF, so that this edge can
433  * start and stop reading normally */
434  conn->xoff_received = false;
436  }
437 
438  /* If this is an entry conn, notify control port */
439  if (TO_CONN(conn)->type == CONN_TYPE_AP) {
441  STREAM_EVENT_XON_RECV,
442  0);
443  }
444 
445  xon_cell_free(xon);
446 
447  return retval;
448 }
449 
450 /**
451  * Called from sendme_stream_data_received(), when data arrives
452  * from a circuit to our edge's outbuf, to decide if we need to send
453  * an XOFF.
454  *
455  * Returns the amount of cells remaining until the buffer is full, at
456  * which point it sends an XOFF, and returns 0.
457  *
458  * Returns less than 0 if we have queued more than a congestion window
459  * worth of data and need to close the circuit.
460  */
461 int
463 {
464  size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream));
465  uint32_t buffer_limit_xoff = 0;
466 
467  if (BUG(edge_get_ccontrol(stream) == NULL)) {
468  log_err(LD_BUG, "Flow control called for non-congestion control circuit");
469  return -1;
470  }
471 
472  /* Onion services and clients are typically localhost edges, so they
473  * need different buffering limits than exits do */
474  if (TO_CONN(stream)->type == CONN_TYPE_AP || stream->hs_ident != NULL) {
475  buffer_limit_xoff = xoff_client;
476  } else {
477  buffer_limit_xoff = xoff_exit;
478  }
479 
480  if (total_buffered > buffer_limit_xoff) {
481  if (!stream->xoff_sent) {
482  log_info(LD_EDGE, "Sending XOFF: %"TOR_PRIuSZ" %d",
483  total_buffered, buffer_limit_xoff);
484  tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xoff_sending), stream);
485 
486  cc_stats_flow_xoff_outbuf_ma =
487  stats_update_running_avg(cc_stats_flow_xoff_outbuf_ma,
488  total_buffered);
489 
490  circuit_send_stream_xoff(stream);
491 
492  /* Clear the drain rate. It is considered wrong if we
493  * got all the way to XOFF */
494  stream->ewma_drain_rate = 0;
495  }
496  }
497 
498  /* If the outbuf has accumulated more than the expected burst limit of
499  * cells, then assume it is not draining, and call decide_xon. We must
500  * do this because writes only happen when the socket unblocks, so
501  * may not otherwise notice accumulation of data in the outbuf for
502  * advisory XONs. */
503  if (total_buffered > MAX_EXPECTED_CELL_BURST*RELAY_PAYLOAD_SIZE) {
504  flow_control_decide_xon(stream, 0);
505  }
506 
507  /* Flow control always takes more data; we rely on the oomkiller to
508  * handle misbehavior. */
509  return 0;
510 }
511 
512 /**
513  * Returns true if the stream's drain rate has changed significantly.
514  *
515  * Returns false if the monotime clock is stalled, or if we have
516  * no previous drain rate information.
517  */
518 static bool
520 {
522  return false;
523  }
524 
525  if (!stream->ewma_rate_last_sent) {
526  return false;
527  }
528 
529  if (stream->ewma_drain_rate >
530  (100+(uint64_t)xon_change_pct)*stream->ewma_rate_last_sent/100) {
531  return true;
532  }
533 
534  if (stream->ewma_drain_rate <
535  (100-(uint64_t)xon_change_pct)*stream->ewma_rate_last_sent/100) {
536  return true;
537  }
538 
539  return false;
540 }
541 
542 /**
543  * Called whenever we drain an edge connection outbuf by writing on
544  * its socket, to decide if it is time to send an xon.
545  *
546  * The n_written parameter tells us how many bytes we have written
547  * this time, which is used to compute the advisory drain rate fields.
548  */
549 void
550 flow_control_decide_xon(edge_connection_t *stream, size_t n_written)
551 {
552  size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream));
553 
554  /* Bounds check the number of drained bytes, and scale */
555  if (stream->drained_bytes >= UINT32_MAX - n_written) {
556  /* Cut the bytes in half, and move the start time up halfway to now
557  * (if we have one). */
558  stream->drained_bytes /= 2;
559 
560  if (stream->drain_start_usec) {
561  uint64_t now = monotime_absolute_usec();
562 
563  stream->drain_start_usec = now - (now-stream->drain_start_usec)/2;
564  }
565  }
566 
567  /* Accumulate drained bytes since last rate computation */
568  stream->drained_bytes += n_written;
569 
570  tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon), stream, n_written);
571 
572  /* Check for bad monotime clock and bytecount wrap */
574  /* If the monotime clock ever goes wrong, the safest thing to do
575  * is just clear our short-term rate info and wait for the clock to
576  * become reliable again.. */
577  stream->drain_start_usec = 0;
578  stream->drained_bytes = 0;
579  } else {
580  /* If we have no drain start timestamp, and we still have
581  * remaining buffer, start the buffering counter */
582  if (!stream->drain_start_usec && total_buffered > 0) {
583  log_debug(LD_EDGE, "Began edge buffering: %d %d %"TOR_PRIuSZ,
584  stream->ewma_rate_last_sent,
585  stream->ewma_drain_rate,
586  total_buffered);
587  tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_start),
588  stream);
590  stream->drained_bytes = 0;
591  }
592  }
593 
594  if (stream->drain_start_usec) {
595  /* If we have spent enough time in a queued state, update our drain
596  * rate. */
597  if (stream->drained_bytes > xon_rate_bytes) {
598  /* No previous drained bytes means it is the first time we are computing
599  * it so use the value we just drained onto the socket as a baseline. It
600  * won't be accurate but it will be a start towards the right value.
601  *
602  * We have to do this in order to have a drain rate else we could be
603  * sending a drain rate of 0 in an XON which would be undesirable and
604  * basically like sending an XOFF. */
605  if (stream->prev_drained_bytes == 0) {
606  stream->prev_drained_bytes = stream->drained_bytes;
607  }
608  uint32_t drain_rate = compute_drain_rate(stream);
609  /* Once the drain rate has been computed, note how many bytes we just
610  * drained so it can be used at the next calculation. We do this here
611  * because it gets reset once the rate is changed. */
612  stream->prev_drained_bytes = stream->drained_bytes;
613 
614  if (drain_rate) {
615  stream->ewma_drain_rate =
616  (uint32_t)n_count_ewma(drain_rate,
617  stream->ewma_drain_rate,
618  xon_ewma_cnt);
619  log_debug(LD_EDGE, "Updating drain rate: %d %d %"TOR_PRIuSZ,
620  drain_rate,
621  stream->ewma_drain_rate,
622  total_buffered);
623  tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_update),
624  stream, drain_rate);
625  /* Reset recent byte counts. This prevents us from sending advisory
626  * XONs more frequent than every xon_rate_bytes. */
627  stream->drained_bytes = 0;
628  stream->drain_start_usec = 0;
629  }
630  }
631  }
632 
633  /* If we don't have an XOFF outstanding, consider updating an
634  * old rate */
635  if (!stream->xoff_sent) {
636  if (stream_drain_rate_changed(stream)) {
637  /* If we are still buffering and the rate changed, update
638  * advisory XON */
639  log_info(LD_EDGE, "Sending rate-change XON: %d %d %"TOR_PRIuSZ,
640  stream->ewma_rate_last_sent,
641  stream->ewma_drain_rate,
642  total_buffered);
643  tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_rate_change), stream);
644 
645  cc_stats_flow_xon_outbuf_ma =
646  stats_update_running_avg(cc_stats_flow_xon_outbuf_ma,
647  total_buffered);
648 
649  circuit_send_stream_xon(stream);
650  }
651  } else if (total_buffered == 0) {
652  log_info(LD_EDGE, "Sending XON: %d %d %"TOR_PRIuSZ,
653  stream->ewma_rate_last_sent,
654  stream->ewma_drain_rate,
655  total_buffered);
656  tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_partial_drain), stream);
657  circuit_send_stream_xon(stream);
658  }
659 
660  /* If the buffer has fully emptied, clear the drain timestamp,
661  * so we can total only bytes drained while outbuf is 0. */
662  if (total_buffered == 0) {
663  stream->drain_start_usec = 0;
664 
665  /* After we've spent 'xon_rate_bytes' with the queue fully drained,
666  * double any rate we sent. */
667  if (stream->drained_bytes >= xon_rate_bytes &&
668  stream->ewma_rate_last_sent) {
669  stream->ewma_drain_rate = MIN(INT32_MAX, 2*stream->ewma_drain_rate);
670 
671  log_debug(LD_EDGE,
672  "Queue empty for xon_rate_limit bytes: %d %d",
673  stream->ewma_rate_last_sent,
674  stream->ewma_drain_rate);
675  tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_doubled), stream);
676  /* Resetting the drained bytes count. We need to keep its value as a
677  * previous so the drain rate calculation takes into account what was
678  * actually drain the last time. */
679  stream->prev_drained_bytes = stream->drained_bytes;
680  stream->drained_bytes = 0;
681  }
682  }
683 
684  return;
685 }
686 
687 /**
688  * Note that we packaged some data on this stream. Used to enforce
689  * client-side dropmark limits
690  */
691 void
693 {
694  /* If we are near the max, scale everything down */
695  if (stream->total_bytes_xmit >= TOTAL_XMIT_SCALE_AT-len) {
696  log_info(LD_EDGE, "Scaling down for flow control xmit bytes:: %d %d %d",
697  stream->total_bytes_xmit,
698  stream->num_xoff_recv,
699  stream->num_xon_recv);
700 
701  stream->total_bytes_xmit /= 2;
702  stream->num_xoff_recv /= 2;
703  stream->num_xon_recv /= 2;
704  }
705 
706  stream->total_bytes_xmit += len;
707 }
708 
709 /** Returns true if an edge connection uses flow control */
710 bool
712 {
713  bool ret = (stream->on_circuit && stream->on_circuit->ccontrol) ||
714  (stream->cpath_layer && stream->cpath_layer->ccontrol);
715 
716  /* All circuits with congestion control use flow control */
717  return ret;
718 }
719 
720 /**
721  * Returns the max RTT for the circuit that carries this stream,
722  * as observed by congestion control.
723  */
724 uint64_t
726 {
727  if (stream->on_circuit && stream->on_circuit->ccontrol)
728  return stream->on_circuit->ccontrol->max_rtt_usec;
729  else if (stream->cpath_layer && stream->cpath_layer->ccontrol)
730  return stream->cpath_layer->ccontrol->max_rtt_usec;
731 
732  return 0;
733 }
734 
735 /** Returns true if a connection is an edge conn that uses flow control */
736 bool
738 {
739  bool ret = false;
740 
741  if (CONN_IS_EDGE(conn)) {
742  edge_connection_t *edge = TO_EDGE_CONN(conn);
743 
744  if (edge_uses_flow_control(edge)) {
745  ret = true;
746  }
747  }
748 
749  return ret;
750 }
751 
Fixed-size cell structure.
Header file for circuitlist.c.
#define MAX(a, b)
Definition: cmp.h:22
uint64_t monotime_absolute_usec(void)
Definition: compat_time.c:804
Header file for config.c.
bool is_monotime_clock_reliable(void)
Public APIs for congestion control.
static uint64_t n_count_ewma(uint64_t curr, uint64_t prev, uint64_t N)
bool circuit_process_stream_xoff(edge_connection_t *conn, const crypt_path_t *layer_hint, const cell_t *cell)
uint64_t edge_get_max_rtt(const edge_connection_t *stream)
static uint32_t compute_drain_rate(const edge_connection_t *stream)
int flow_control_decide_xoff(edge_connection_t *stream)
void flow_control_note_sent_data(edge_connection_t *stream, size_t len)
static bool stream_drain_rate_changed(const edge_connection_t *stream)
bool conn_uses_flow_control(connection_t *conn)
void flow_control_new_consensus_params(const networkstatus_t *ns)
bool edge_uses_flow_control(const edge_connection_t *stream)
uint64_t cc_stats_flow_num_xoff_sent
static void circuit_send_stream_xon(edge_connection_t *stream)
bool circuit_process_stream_xon(edge_connection_t *conn, const crypt_path_t *layer_hint, const cell_t *cell)
void flow_control_decide_xon(edge_connection_t *stream, size_t n_written)
static const congestion_control_t * edge_get_ccontrol(const edge_connection_t *edge)
static void circuit_send_stream_xoff(edge_connection_t *stream)
static uint32_t xoff_client
APIs for stream flow control on congestion controlled circuits.
Structure definitions for congestion control.
Header file for connection.c.
#define CONN_TYPE_AP
Definition: connection.h:51
edge_connection_t * TO_EDGE_CONN(connection_t *c)
entry_connection_t * TO_ENTRY_CONN(connection_t *c)
Header file for connection_edge.c.
Base connection structure.
#define CONN_IS_EDGE(x)
int control_event_stream_status(entry_connection_t *conn, stream_status_event_t tp, int reason_code)
Header file for control_events.c.
#define TR_SUBSYS(name)
Definition: events.h:45
#define log_fn(severity, domain, args,...)
Definition: log.h:283
#define LD_EDGE
Definition: log.h:94
#define LD_BUG
Definition: log.h:86
void connection_stop_reading(connection_t *conn)
Definition: mainloop.c:601
void connection_start_reading(connection_t *conn)
Definition: mainloop.c:623
Header file for mainloop.c.
int32_t networkstatus_get_param(const networkstatus_t *ns, const char *param_name, int32_t default_val, int32_t min_val, int32_t max_val)
Header file for networkstatus.c.
Master header file for Tor-specific functionality.
#define CELL_PAYLOAD_SIZE
Definition: or.h:456
#define RELAY_PAYLOAD_SIZE
Definition: or.h:485
#define TO_CONN(c)
Definition: or.h:603
#define RELAY_HEADER_SIZE
Definition: or.h:483
int connection_edge_send_command(edge_connection_t *fromconn, uint8_t relay_command, const char *payload, size_t payload_len)
Definition: relay.c:737
Header file for relay.c.
Header for stats.c.
Definition: cell_st.h:17
uint8_t payload[CELL_PAYLOAD_SIZE]
Definition: cell_st.h:21
struct congestion_control_t * ccontrol
Definition: circuit_st.h:250
struct congestion_control_t * ccontrol
Definition: crypt_path_st.h:89
struct crypt_path_t * cpath_layer
token_bucket_rw_t bucket
struct circuit_t * on_circuit
void token_bucket_rw_adjust(token_bucket_rw_t *bucket, uint32_t rate, uint32_t burst)
Definition: token_bucket.c:152