Tor 0.4.9.0-alpha-dev
congestion_control_flow.c
Go to the documentation of this file.
1/* Copyright (c) 2019-2021, The Tor Project, Inc. */
2/* See LICENSE for licensing information */
3
4/**
5 * \file congestion_control_flow.c
6 * \brief Code that implements flow control for congestion controlled
7 * circuits.
8 */
9
10#define TOR_CONGESTION_CONTROL_FLOW_PRIVATE
11
12#include "core/or/or.h"
13
14#include "core/or/relay.h"
21#include "core/or/circuitlist.h"
22#include "core/or/trace_probes_cc.h"
24#include "trunnel/flow_control_cells.h"
26#include "lib/math/stats.h"
27
29#include "core/or/cell_st.h"
30#include "app/config/config.h"
32
33/** Cache consensus parameters */
34static uint32_t xoff_client;
35static uint32_t xoff_exit;
36
37static uint32_t xon_change_pct;
38static uint32_t xon_ewma_cnt;
39static uint32_t xon_rate_bytes;
40
41/** Metricsport stats */
43uint64_t cc_stats_flow_num_xon_sent;
44double cc_stats_flow_xoff_outbuf_ma = 0;
45double cc_stats_flow_xon_outbuf_ma = 0;
46
47/* In normal operation, we can get a burst of up to 32 cells before returning
48 * to libevent to flush the outbuf. This is a heuristic from hardcoded values
49 * and strange logic in connection_bucket_get_share(). */
50#define MAX_EXPECTED_CELL_BURST 32
51
52/* The following three are for dropmark rate limiting. They define when we
53 * scale down our XON, XOFF, and xmit byte counts. Early scaling is beneficial
54 * because it limits the ability of spurious XON/XOFF to be sent after large
55 * amounts of data without XON/XOFF. At these limits, after 10MB of data (or
56 * more), an adversary can only inject (log2(10MB)-log2(200*500))*100 ~= 1000
57 * cells of fake XOFF/XON before the xmit byte count will be halved enough to
58 * triggering a limit. */
59#define XON_COUNT_SCALE_AT 200
60#define XOFF_COUNT_SCALE_AT 200
61#define ONE_MEGABYTE (UINT64_C(1) << 20)
62#define TOTAL_XMIT_SCALE_AT (10 * ONE_MEGABYTE)
63
64/**
65 * Update global congestion control related consensus parameter values, every
66 * consensus update.
67 *
68 * More details for each of the parameters can be found in proposal 324,
69 * section 6.5 including tuning notes.
70 */
71void
73{
74#define CC_XOFF_CLIENT_DFLT 500
75#define CC_XOFF_CLIENT_MIN 1
76#define CC_XOFF_CLIENT_MAX 10000
77 xoff_client = networkstatus_get_param(ns, "cc_xoff_client",
78 CC_XOFF_CLIENT_DFLT,
79 CC_XOFF_CLIENT_MIN,
80 CC_XOFF_CLIENT_MAX)*RELAY_PAYLOAD_SIZE;
81
82#define CC_XOFF_EXIT_DFLT 500
83#define CC_XOFF_EXIT_MIN 1
84#define CC_XOFF_EXIT_MAX 10000
85 xoff_exit = networkstatus_get_param(ns, "cc_xoff_exit",
86 CC_XOFF_EXIT_DFLT,
87 CC_XOFF_EXIT_MIN,
88 CC_XOFF_EXIT_MAX)*RELAY_PAYLOAD_SIZE;
89
90#define CC_XON_CHANGE_PCT_DFLT 25
91#define CC_XON_CHANGE_PCT_MIN 1
92#define CC_XON_CHANGE_PCT_MAX 99
93 xon_change_pct = networkstatus_get_param(ns, "cc_xon_change_pct",
94 CC_XON_CHANGE_PCT_DFLT,
95 CC_XON_CHANGE_PCT_MIN,
96 CC_XON_CHANGE_PCT_MAX);
97
98#define CC_XON_RATE_BYTES_DFLT (500)
99#define CC_XON_RATE_BYTES_MIN (1)
100#define CC_XON_RATE_BYTES_MAX (5000)
101 xon_rate_bytes = networkstatus_get_param(ns, "cc_xon_rate",
102 CC_XON_RATE_BYTES_DFLT,
103 CC_XON_RATE_BYTES_MIN,
104 CC_XON_RATE_BYTES_MAX)*RELAY_PAYLOAD_SIZE;
105
106#define CC_XON_EWMA_CNT_DFLT (2)
107#define CC_XON_EWMA_CNT_MIN (2)
108#define CC_XON_EWMA_CNT_MAX (100)
109 xon_ewma_cnt = networkstatus_get_param(ns, "cc_xon_ewma_cnt",
110 CC_XON_EWMA_CNT_DFLT,
111 CC_XON_EWMA_CNT_MIN,
112 CC_XON_EWMA_CNT_MAX);
113}
114
115/**
116 * Send an XOFF for this stream, and note that we sent one
117 */
118static void
120{
121 xoff_cell_t xoff;
122 uint8_t payload[CELL_PAYLOAD_SIZE];
123 ssize_t xoff_size;
124
125 memset(&xoff, 0, sizeof(xoff));
126 memset(payload, 0, sizeof(payload));
127
128 xoff_cell_set_version(&xoff, 0);
129
130 if ((xoff_size = xoff_cell_encode(payload, CELL_PAYLOAD_SIZE, &xoff)) < 0) {
131 log_warn(LD_BUG, "Failed to encode xon cell");
132 return;
133 }
134
135 if (connection_edge_send_command(stream, RELAY_COMMAND_XOFF,
136 (char*)payload, (size_t)xoff_size) == 0) {
137 stream->xoff_sent = true;
139
140 /* If this is an entry conn, notify control port */
141 if (TO_CONN(stream)->type == CONN_TYPE_AP) {
143 STREAM_EVENT_XOFF_SENT,
144 0);
145 }
146 }
147}
148
149/**
150 * Compute the recent drain rate (write rate) for this edge
151 * connection and return it, in KB/sec (1000 bytes/sec).
152 *
153 * Returns 0 if the monotime clock is busted.
154 */
155static inline uint32_t
157{
158 if (BUG(!is_monotime_clock_reliable())) {
159 log_warn(LD_BUG, "Computing drain rate with stalled monotime clock");
160 return 0;
161 }
162
163 uint64_t delta = monotime_absolute_usec() - stream->drain_start_usec;
164
165 if (delta == 0) {
166 log_warn(LD_BUG, "Computing stream drain rate with zero time delta");
167 return 0;
168 }
169
170 /* Overflow checks */
171 if (stream->prev_drained_bytes > INT32_MAX/1000 || /* Intermediate */
172 stream->prev_drained_bytes/delta > INT32_MAX/1000) { /* full value */
173 return INT32_MAX;
174 }
175
176 /* kb/sec = bytes/usec * 1000 usec/msec * 1000 msec/sec * kb/1000bytes */
177 return MAX(1, (uint32_t)(stream->prev_drained_bytes * 1000)/delta);
178}
179
180/**
181 * Send an XON for this stream, with appropriate advisory rate information.
182 *
183 * Reverts the xoff sent status, and stores the rate information we sent,
184 * in case it changes.
185 */
186static void
188{
189 xon_cell_t xon;
190 uint8_t payload[CELL_PAYLOAD_SIZE];
191 ssize_t xon_size;
192
193 memset(&xon, 0, sizeof(xon));
194 memset(payload, 0, sizeof(payload));
195
196 xon_cell_set_version(&xon, 0);
197 xon_cell_set_kbps_ewma(&xon, stream->ewma_drain_rate);
198
199 if ((xon_size = xon_cell_encode(payload, CELL_PAYLOAD_SIZE, &xon)) < 0) {
200 log_warn(LD_BUG, "Failed to encode xon cell");
201 return;
202 }
203
204 /* Store the advisory rate information, to send advisory updates if
205 * it changes */
206 stream->ewma_rate_last_sent = stream->ewma_drain_rate;
207
208 if (connection_edge_send_command(stream, RELAY_COMMAND_XON, (char*)payload,
209 (size_t)xon_size) == 0) {
210 /* Revert the xoff sent status, so we can send another one if need be */
211 stream->xoff_sent = false;
212
213 cc_stats_flow_num_xon_sent++;
214
215 /* If it's an entry conn, notify control port */
216 if (TO_CONN(stream)->type == CONN_TYPE_AP) {
218 STREAM_EVENT_XON_SENT,
219 0);
220 }
221 }
222}
223
224/**
225 * Process a stream XOFF, parsing it, and then stopping reading on
226 * the edge connection.
227 *
228 * Record that we have received an xoff, so we know not to resume
229 * reading on this edge conn until we get an XON.
230 *
231 * Returns false if the XOFF did not validate; true if it does.
232 */
233bool
235 const crypt_path_t *layer_hint,
236 const cell_t *cell)
237{
238 (void)cell;
239 bool retval = true;
240
241 if (BUG(!conn)) {
242 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
243 "Got XOFF on invalid stream?");
244 return false;
245 }
246
247 /* Make sure this XOFF came from the right hop */
248 if (!edge_uses_cpath(conn, layer_hint)) {
249 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
250 "Got XOFF from wrong hop.");
251 return false;
252 }
253
254 if (!edge_uses_flow_control(conn)) {
255 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
256 "Got XOFF for non-congestion control circuit");
257 return false;
258 }
259
260 if (conn->xoff_received) {
261 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
262 "Got multiple XOFF on connection");
263 return false;
264 }
265
266 /* If we are near the max, scale everything down */
267 if (conn->num_xoff_recv == XOFF_COUNT_SCALE_AT) {
268 log_info(LD_EDGE, "Scaling down for XOFF count: %d %d %d",
269 conn->total_bytes_xmit,
270 conn->num_xoff_recv,
271 conn->num_xon_recv);
272 conn->total_bytes_xmit /= 2;
273 conn->num_xoff_recv /= 2;
274 conn->num_xon_recv /= 2;
275 }
276
277 conn->num_xoff_recv++;
278
279 /* Client-side check to make sure that XOFF is not sent too early,
280 * for dropmark attacks. The main sidechannel risk is early cells,
281 * but we also check to make sure that we have not received more XOFFs
282 * than could have been generated by the bytes we sent.
283 */
284 if (TO_CONN(conn)->type == CONN_TYPE_AP || conn->hs_ident != NULL) {
285 uint32_t limit = 0;
286 if (conn->hs_ident)
287 limit = xoff_client;
288 else
289 limit = xoff_exit;
290
291 if (conn->total_bytes_xmit < limit*conn->num_xoff_recv) {
292 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
293 "Got extra XOFF for bytes sent. Got %d, expected max %d",
294 conn->num_xoff_recv, conn->total_bytes_xmit/limit);
295 /* We still process this, because the only dropmark defenses
296 * in C tor are via the vanguards addon's use of the read valid
297 * cells. So just signal that we think this is not valid protocol
298 * data and proceed. */
299 retval = false;
300 }
301 }
302
303 log_info(LD_EDGE, "Got XOFF!");
305 conn->xoff_received = true;
306
307 /* If this is an entry conn, notify control port */
308 if (TO_CONN(conn)->type == CONN_TYPE_AP) {
310 STREAM_EVENT_XOFF_RECV,
311 0);
312 }
313
314 return retval;
315}
316
317/**
318 * Process a stream XON, and if it validates, clear the xoff
319 * flag and resume reading on this edge connection.
320 *
321 * Also, use provided rate information to rate limit
322 * reading on this edge (or packagaing from it onto
323 * the circuit), to avoid XON/XOFF chatter.
324 *
325 * Returns true if the XON validates, false otherwise.
326 */
327bool
329 const crypt_path_t *layer_hint,
330 const cell_t *cell)
331{
332 xon_cell_t *xon;
333 bool retval = true;
334
335 if (BUG(!conn)) {
336 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
337 "Got XON on invalid stream?");
338 return false;
339 }
340
341 /* Make sure this XON came from the right hop */
342 if (!edge_uses_cpath(conn, layer_hint)) {
343 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
344 "Got XON from wrong hop.");
345 return false;
346 }
347
348 if (!edge_uses_flow_control(conn)) {
349 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
350 "Got XON for non-congestion control circuit");
351 return false;
352 }
353
354 if (xon_cell_parse(&xon, cell->payload+RELAY_HEADER_SIZE,
356 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
357 "Received malformed XON cell.");
358 return false;
359 }
360
361 /* If we are near the max, scale everything down */
362 if (conn->num_xon_recv == XON_COUNT_SCALE_AT) {
363 log_info(LD_EDGE, "Scaling down for XON count: %d %d %d",
364 conn->total_bytes_xmit,
365 conn->num_xoff_recv,
366 conn->num_xon_recv);
367 conn->total_bytes_xmit /= 2;
368 conn->num_xoff_recv /= 2;
369 conn->num_xon_recv /= 2;
370 }
371
372 conn->num_xon_recv++;
373
374 /* Client-side check to make sure that XON is not sent too early,
375 * for dropmark attacks. The main sidechannel risk is early cells,
376 * but we also check to see that we did not get more XONs than make
377 * sense for the number of bytes we sent.
378 */
379 if (TO_CONN(conn)->type == CONN_TYPE_AP || conn->hs_ident != NULL) {
380 uint32_t limit = 0;
381
382 if (conn->hs_ident)
383 limit = MIN(xoff_client, xon_rate_bytes);
384 else
385 limit = MIN(xoff_exit, xon_rate_bytes);
386
387 if (conn->total_bytes_xmit < limit*conn->num_xon_recv) {
388 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
389 "Got extra XON for bytes sent. Got %d, expected max %d",
390 conn->num_xon_recv, conn->total_bytes_xmit/limit);
391
392 /* We still process this, because the only dropmark defenses
393 * in C tor are via the vanguards addon's use of the read valid
394 * cells. So just signal that we think this is not valid protocol
395 * data and proceed. */
396 retval = false;
397 }
398 }
399
400 log_info(LD_EDGE, "Got XON: %d", xon->kbps_ewma);
401
402 /* Adjust the token bucket of this edge connection with the drain rate in
403 * the XON. Rate is in bytes from kilobit (kpbs). */
404 uint64_t rate = ((uint64_t) xon_cell_get_kbps_ewma(xon) * 1000);
405 if (rate == 0 || INT32_MAX < rate) {
406 /* No rate. */
407 rate = INT32_MAX;
408 }
409 token_bucket_rw_adjust(&conn->bucket, (uint32_t) rate, (uint32_t) rate);
410
411 if (conn->xoff_received) {
412 /* Clear the fact that we got an XOFF, so that this edge can
413 * start and stop reading normally */
414 conn->xoff_received = false;
416 }
417
418 /* If this is an entry conn, notify control port */
419 if (TO_CONN(conn)->type == CONN_TYPE_AP) {
421 STREAM_EVENT_XON_RECV,
422 0);
423 }
424
425 xon_cell_free(xon);
426
427 return retval;
428}
429
430/**
431 * Called from sendme_stream_data_received(), when data arrives
432 * from a circuit to our edge's outbuf, to decide if we need to send
433 * an XOFF.
434 *
435 * Returns the amount of cells remaining until the buffer is full, at
436 * which point it sends an XOFF, and returns 0.
437 *
438 * Returns less than 0 if we have queued more than a congestion window
439 * worth of data and need to close the circuit.
440 */
441int
443{
444 size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream));
445 uint32_t buffer_limit_xoff = 0;
446
447 if (BUG(!edge_uses_flow_control(stream))) {
448 log_err(LD_BUG, "Flow control called for non-congestion control circuit");
449 return -1;
450 }
451
452 /* Onion services and clients are typically localhost edges, so they
453 * need different buffering limits than exits do */
454 if (TO_CONN(stream)->type == CONN_TYPE_AP || stream->hs_ident != NULL) {
455 buffer_limit_xoff = xoff_client;
456 } else {
457 buffer_limit_xoff = xoff_exit;
458 }
459
460 if (total_buffered > buffer_limit_xoff) {
461 if (!stream->xoff_sent) {
462 log_info(LD_EDGE, "Sending XOFF: %"TOR_PRIuSZ" %d",
463 total_buffered, buffer_limit_xoff);
464 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xoff_sending), stream);
465
466 cc_stats_flow_xoff_outbuf_ma =
467 stats_update_running_avg(cc_stats_flow_xoff_outbuf_ma,
468 total_buffered);
469
471
472 /* Clear the drain rate. It is considered wrong if we
473 * got all the way to XOFF */
474 stream->ewma_drain_rate = 0;
475 }
476 }
477
478 /* If the outbuf has accumulated more than the expected burst limit of
479 * cells, then assume it is not draining, and call decide_xon. We must
480 * do this because writes only happen when the socket unblocks, so
481 * may not otherwise notice accumulation of data in the outbuf for
482 * advisory XONs. */
483 if (total_buffered > MAX_EXPECTED_CELL_BURST*RELAY_PAYLOAD_SIZE) {
484 flow_control_decide_xon(stream, 0);
485 }
486
487 /* Flow control always takes more data; we rely on the oomkiller to
488 * handle misbehavior. */
489 return 0;
490}
491
492/**
493 * Returns true if the stream's drain rate has changed significantly.
494 *
495 * Returns false if the monotime clock is stalled, or if we have
496 * no previous drain rate information.
497 */
498static bool
500{
502 return false;
503 }
504
505 if (!stream->ewma_rate_last_sent) {
506 return false;
507 }
508
509 if (stream->ewma_drain_rate >
510 (100+(uint64_t)xon_change_pct)*stream->ewma_rate_last_sent/100) {
511 return true;
512 }
513
514 if (stream->ewma_drain_rate <
515 (100-(uint64_t)xon_change_pct)*stream->ewma_rate_last_sent/100) {
516 return true;
517 }
518
519 return false;
520}
521
522/**
523 * Called whenever we drain an edge connection outbuf by writing on
524 * its socket, to decide if it is time to send an xon.
525 *
526 * The n_written parameter tells us how many bytes we have written
527 * this time, which is used to compute the advisory drain rate fields.
528 */
529void
531{
532 size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream));
533
534 /* Bounds check the number of drained bytes, and scale */
535 if (stream->drained_bytes >= UINT32_MAX - n_written) {
536 /* Cut the bytes in half, and move the start time up halfway to now
537 * (if we have one). */
538 stream->drained_bytes /= 2;
539
540 if (stream->drain_start_usec) {
541 uint64_t now = monotime_absolute_usec();
542
543 stream->drain_start_usec = now - (now-stream->drain_start_usec)/2;
544 }
545 }
546
547 /* Accumulate drained bytes since last rate computation */
548 stream->drained_bytes += n_written;
549
550 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon), stream, n_written);
551
552 /* Check for bad monotime clock and bytecount wrap */
554 /* If the monotime clock ever goes wrong, the safest thing to do
555 * is just clear our short-term rate info and wait for the clock to
556 * become reliable again.. */
557 stream->drain_start_usec = 0;
558 stream->drained_bytes = 0;
559 } else {
560 /* If we have no drain start timestamp, and we still have
561 * remaining buffer, start the buffering counter */
562 if (!stream->drain_start_usec && total_buffered > 0) {
563 log_debug(LD_EDGE, "Began edge buffering: %d %d %"TOR_PRIuSZ,
564 stream->ewma_rate_last_sent,
565 stream->ewma_drain_rate,
566 total_buffered);
567 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_start),
568 stream);
570 stream->drained_bytes = 0;
571 }
572 }
573
574 if (stream->drain_start_usec) {
575 /* If we have spent enough time in a queued state, update our drain
576 * rate. */
577 if (stream->drained_bytes > xon_rate_bytes) {
578 /* No previous drained bytes means it is the first time we are computing
579 * it so use the value we just drained onto the socket as a baseline. It
580 * won't be accurate but it will be a start towards the right value.
581 *
582 * We have to do this in order to have a drain rate else we could be
583 * sending a drain rate of 0 in an XON which would be undesirable and
584 * basically like sending an XOFF. */
585 if (stream->prev_drained_bytes == 0) {
586 stream->prev_drained_bytes = stream->drained_bytes;
587 }
588 uint32_t drain_rate = compute_drain_rate(stream);
589 /* Once the drain rate has been computed, note how many bytes we just
590 * drained so it can be used at the next calculation. We do this here
591 * because it gets reset once the rate is changed. */
592 stream->prev_drained_bytes = stream->drained_bytes;
593
594 if (drain_rate) {
595 stream->ewma_drain_rate =
596 (uint32_t)n_count_ewma(drain_rate,
597 stream->ewma_drain_rate,
598 xon_ewma_cnt);
599 log_debug(LD_EDGE, "Updating drain rate: %d %d %"TOR_PRIuSZ,
600 drain_rate,
601 stream->ewma_drain_rate,
602 total_buffered);
603 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_update),
604 stream, drain_rate);
605 /* Reset recent byte counts. This prevents us from sending advisory
606 * XONs more frequent than every xon_rate_bytes. */
607 stream->drained_bytes = 0;
608 stream->drain_start_usec = 0;
609 }
610 }
611 }
612
613 /* If we don't have an XOFF outstanding, consider updating an
614 * old rate */
615 if (!stream->xoff_sent) {
616 if (stream_drain_rate_changed(stream)) {
617 /* If we are still buffering and the rate changed, update
618 * advisory XON */
619 log_info(LD_EDGE, "Sending rate-change XON: %d %d %"TOR_PRIuSZ,
620 stream->ewma_rate_last_sent,
621 stream->ewma_drain_rate,
622 total_buffered);
623 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_rate_change), stream);
624
625 cc_stats_flow_xon_outbuf_ma =
626 stats_update_running_avg(cc_stats_flow_xon_outbuf_ma,
627 total_buffered);
628
630 }
631 } else if (total_buffered == 0) {
632 log_info(LD_EDGE, "Sending XON: %d %d %"TOR_PRIuSZ,
633 stream->ewma_rate_last_sent,
634 stream->ewma_drain_rate,
635 total_buffered);
636 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_partial_drain), stream);
638 }
639
640 /* If the buffer has fully emptied, clear the drain timestamp,
641 * so we can total only bytes drained while outbuf is 0. */
642 if (total_buffered == 0) {
643 stream->drain_start_usec = 0;
644
645 /* After we've spent 'xon_rate_bytes' with the queue fully drained,
646 * double any rate we sent. */
647 if (stream->drained_bytes >= xon_rate_bytes &&
648 stream->ewma_rate_last_sent) {
649 stream->ewma_drain_rate = MIN(INT32_MAX, 2*stream->ewma_drain_rate);
650
651 log_debug(LD_EDGE,
652 "Queue empty for xon_rate_limit bytes: %d %d",
653 stream->ewma_rate_last_sent,
654 stream->ewma_drain_rate);
655 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_doubled), stream);
656 /* Resetting the drained bytes count. We need to keep its value as a
657 * previous so the drain rate calculation takes into account what was
658 * actually drain the last time. */
659 stream->prev_drained_bytes = stream->drained_bytes;
660 stream->drained_bytes = 0;
661 }
662 }
663
664 return;
665}
666
667/**
668 * Note that we packaged some data on this stream. Used to enforce
669 * client-side dropmark limits
670 */
671void
673{
674 /* If we are near the max, scale everything down */
675 if (stream->total_bytes_xmit >= TOTAL_XMIT_SCALE_AT-len) {
676 log_info(LD_EDGE, "Scaling down for flow control xmit bytes:: %d %d %d",
677 stream->total_bytes_xmit,
678 stream->num_xoff_recv,
679 stream->num_xon_recv);
680
681 stream->total_bytes_xmit /= 2;
682 stream->num_xoff_recv /= 2;
683 stream->num_xon_recv /= 2;
684 }
685
686 stream->total_bytes_xmit += len;
687}
688
689/** Returns true if an edge connection uses flow control */
690bool
692{
693 bool ret = (stream->on_circuit && stream->on_circuit->ccontrol) ||
694 (stream->cpath_layer && stream->cpath_layer->ccontrol);
695
696 /* All circuits with congestion control use flow control */
697 return ret;
698}
699
700/** Returns true if a connection is an edge conn that uses flow control */
701bool
703{
704 bool ret = false;
705
706 if (CONN_IS_EDGE(conn)) {
707 edge_connection_t *edge = TO_EDGE_CONN(conn);
708
709 if (edge_uses_flow_control(edge)) {
710 ret = true;
711 }
712 }
713
714 return ret;
715}
716
Fixed-size cell structure.
Header file for circuitlist.c.
#define MAX(a, b)
Definition: cmp.h:22
uint64_t monotime_absolute_usec(void)
Definition: compat_time.c:804
Header file for config.c.
bool edge_uses_cpath(const edge_connection_t *conn, const crypt_path_t *cpath)
Definition: conflux_util.c:172
Header file for conflux_util.c.
bool is_monotime_clock_reliable(void)
Public APIs for congestion control.
static uint64_t n_count_ewma(uint64_t curr, uint64_t prev, uint64_t N)
bool circuit_process_stream_xoff(edge_connection_t *conn, const crypt_path_t *layer_hint, const cell_t *cell)
static uint32_t compute_drain_rate(const edge_connection_t *stream)
int flow_control_decide_xoff(edge_connection_t *stream)
void flow_control_note_sent_data(edge_connection_t *stream, size_t len)
static bool stream_drain_rate_changed(const edge_connection_t *stream)
bool conn_uses_flow_control(connection_t *conn)
void flow_control_new_consensus_params(const networkstatus_t *ns)
bool edge_uses_flow_control(const edge_connection_t *stream)
uint64_t cc_stats_flow_num_xoff_sent
static void circuit_send_stream_xon(edge_connection_t *stream)
bool circuit_process_stream_xon(edge_connection_t *conn, const crypt_path_t *layer_hint, const cell_t *cell)
void flow_control_decide_xon(edge_connection_t *stream, size_t n_written)
static void circuit_send_stream_xoff(edge_connection_t *stream)
static uint32_t xoff_client
APIs for stream flow control on congestion controlled circuits.
Structure definitions for congestion control.
Header file for connection.c.
#define CONN_TYPE_AP
Definition: connection.h:51
entry_connection_t * TO_ENTRY_CONN(connection_t *c)
edge_connection_t * TO_EDGE_CONN(connection_t *c)
Header file for connection_edge.c.
Base connection structure.
#define CONN_IS_EDGE(x)
int control_event_stream_status(entry_connection_t *conn, stream_status_event_t tp, int reason_code)
Header file for control_events.c.
#define TR_SUBSYS(name)
Definition: events.h:45
#define log_fn(severity, domain, args,...)
Definition: log.h:283
#define LD_EDGE
Definition: log.h:94
#define LD_BUG
Definition: log.h:86
void connection_stop_reading(connection_t *conn)
Definition: mainloop.c:601
void connection_start_reading(connection_t *conn)
Definition: mainloop.c:623
Header file for mainloop.c.
int32_t networkstatus_get_param(const networkstatus_t *ns, const char *param_name, int32_t default_val, int32_t min_val, int32_t max_val)
Header file for networkstatus.c.
Master header file for Tor-specific functionality.
#define CELL_PAYLOAD_SIZE
Definition: or.h:466
#define RELAY_PAYLOAD_SIZE
Definition: or.h:495
#define TO_CONN(c)
Definition: or.h:613
#define RELAY_HEADER_SIZE
Definition: or.h:493
int connection_edge_send_command(edge_connection_t *fromconn, uint8_t relay_command, const char *payload, size_t payload_len)
Definition: relay.c:800
Header file for relay.c.
Header for stats.c.
Definition: cell_st.h:17
uint8_t payload[CELL_PAYLOAD_SIZE]
Definition: cell_st.h:21
struct congestion_control_t * ccontrol
Definition: circuit_st.h:250
struct congestion_control_t * ccontrol
Definition: crypt_path_st.h:88
struct crypt_path_t * cpath_layer
token_bucket_rw_t bucket
struct circuit_t * on_circuit
void token_bucket_rw_adjust(token_bucket_rw_t *bucket, uint32_t rate, uint32_t burst)
Definition: token_bucket.c:154