Tor 0.4.9.3-alpha-dev
Loading...
Searching...
No Matches
congestion_control_flow.c
Go to the documentation of this file.
1/* Copyright (c) 2019-2021, The Tor Project, Inc. */
2/* See LICENSE for licensing information */
3
4/**
5 * \file congestion_control_flow.c
6 * \brief Code that implements flow control for congestion controlled
7 * circuits.
8 */
9
10#define TOR_CONGESTION_CONTROL_FLOW_PRIVATE
11
12#include "core/or/or.h"
13
14#include "core/or/relay.h"
21#include "core/or/circuitlist.h"
22#include "core/or/trace_probes_cc.h"
24#include "trunnel/flow_control_cells.h"
26#include "lib/math/stats.h"
27
29#include "core/or/cell_st.h"
30#include "app/config/config.h"
32
33/** Cache consensus parameters */
34static uint32_t xoff_client;
35static uint32_t xoff_exit;
36
37static uint32_t xon_change_pct;
38static uint32_t xon_ewma_cnt;
39static uint32_t xon_rate_bytes;
40
41/** Metricsport stats */
43uint64_t cc_stats_flow_num_xon_sent;
44double cc_stats_flow_xoff_outbuf_ma = 0;
45double cc_stats_flow_xon_outbuf_ma = 0;
46
47/* In normal operation, we can get a burst of up to 32 cells before returning
48 * to libevent to flush the outbuf. This is a heuristic from hardcoded values
49 * and strange logic in connection_bucket_get_share(). */
50#define MAX_EXPECTED_CELL_BURST 32
51
52/* This is the grace period that we use to give the edge connection a chance to
53 * reduce its outbuf before we send an XOFF.
54 *
55 * The congestion control spec says:
56 * > If the length of an edge outbuf queue exceeds the size provided in the
57 * > appropriate client or exit XOFF consensus parameter, a
58 * > RELAY_COMMAND_STREAM_XOFF will be sent
59 *
60 * This doesn't directly adapt well to tor, where we process many incoming
61 * messages at once. We may buffer a lot of stream data before giving the
62 * mainloop a chance to flush the the edge connection's outbuf, even if the
63 * edge connection's socket is able to accept more bytes.
64 *
65 * Instead if we detect that we should send an XOFF (as described in the cc
66 * spec), we delay sending an XOFF for `XOFF_GRACE_PERIOD_USEC` microseconds.
67 * This gives the mainloop a chance to flush the buffer to the edge
68 * connection's socket. If this flush causes the outbuf queue to shrink under
69 * our XOFF limit, then we no longer need to send an XOFF. If after
70 * `XOFF_GRACE_PERIOD_USEC` we receive another message and the outbuf queue
71 * still exceeds the XOFF limit, we send an XOFF.
72 *
73 * The value of 5 milliseconds was chosen arbitrarily. In practice it should be
74 * enough time for the edge connection to get a chance to flush, but not too
75 * long to cause excessive buffering.
76 */
77#define XOFF_GRACE_PERIOD_USEC (5000)
78
79/* The following three are for dropmark rate limiting. They define when we
80 * scale down our XON, XOFF, and xmit byte counts. Early scaling is beneficial
81 * because it limits the ability of spurious XON/XOFF to be sent after large
82 * amounts of data without XON/XOFF. At these limits, after 10MB of data (or
83 * more), an adversary can only inject (log2(10MB)-log2(200*500))*100 ~= 1000
84 * cells of fake XOFF/XON before the xmit byte count will be halved enough to
85 * triggering a limit. */
86#define XON_COUNT_SCALE_AT 200
87#define XOFF_COUNT_SCALE_AT 200
88#define ONE_MEGABYTE (UINT64_C(1) << 20)
89#define TOTAL_XMIT_SCALE_AT (10 * ONE_MEGABYTE)
90
91/**
92 * Update global congestion control related consensus parameter values, every
93 * consensus update.
94 *
95 * More details for each of the parameters can be found in proposal 324,
96 * section 6.5 including tuning notes.
97 */
98void
100{
101#define CC_XOFF_CLIENT_DFLT 500
102#define CC_XOFF_CLIENT_MIN 1
103#define CC_XOFF_CLIENT_MAX 10000
104 xoff_client = networkstatus_get_param(ns, "cc_xoff_client",
105 CC_XOFF_CLIENT_DFLT,
106 CC_XOFF_CLIENT_MIN,
107 CC_XOFF_CLIENT_MAX)*RELAY_PAYLOAD_SIZE_MIN;
108
109#define CC_XOFF_EXIT_DFLT 500
110#define CC_XOFF_EXIT_MIN 1
111#define CC_XOFF_EXIT_MAX 10000
112 xoff_exit = networkstatus_get_param(ns, "cc_xoff_exit",
113 CC_XOFF_EXIT_DFLT,
114 CC_XOFF_EXIT_MIN,
115 CC_XOFF_EXIT_MAX)*RELAY_PAYLOAD_SIZE_MIN;
116
117#define CC_XON_CHANGE_PCT_DFLT 25
118#define CC_XON_CHANGE_PCT_MIN 1
119#define CC_XON_CHANGE_PCT_MAX 99
120 xon_change_pct = networkstatus_get_param(ns, "cc_xon_change_pct",
121 CC_XON_CHANGE_PCT_DFLT,
122 CC_XON_CHANGE_PCT_MIN,
123 CC_XON_CHANGE_PCT_MAX);
124
125#define CC_XON_RATE_BYTES_DFLT (500)
126#define CC_XON_RATE_BYTES_MIN (1)
127#define CC_XON_RATE_BYTES_MAX (5000)
128 xon_rate_bytes = networkstatus_get_param(ns, "cc_xon_rate",
129 CC_XON_RATE_BYTES_DFLT,
130 CC_XON_RATE_BYTES_MIN,
131 CC_XON_RATE_BYTES_MAX)*RELAY_PAYLOAD_SIZE_MAX;
132
133#define CC_XON_EWMA_CNT_DFLT (2)
134#define CC_XON_EWMA_CNT_MIN (2)
135#define CC_XON_EWMA_CNT_MAX (100)
136 xon_ewma_cnt = networkstatus_get_param(ns, "cc_xon_ewma_cnt",
137 CC_XON_EWMA_CNT_DFLT,
138 CC_XON_EWMA_CNT_MIN,
139 CC_XON_EWMA_CNT_MAX);
140}
141
142/**
143 * Send an XOFF for this stream, and note that we sent one
144 */
145static void
147{
148 xoff_cell_t xoff;
149 uint8_t payload[CELL_PAYLOAD_SIZE];
150 ssize_t xoff_size;
151
152 memset(&xoff, 0, sizeof(xoff));
153 memset(payload, 0, sizeof(payload));
154
155 xoff_cell_set_version(&xoff, 0);
156
157 if ((xoff_size = xoff_cell_encode(payload, CELL_PAYLOAD_SIZE, &xoff)) < 0) {
158 log_warn(LD_BUG, "Failed to encode xon cell");
159 return;
160 }
161
162 if (connection_edge_send_command(stream, RELAY_COMMAND_XOFF,
163 (char*)payload, (size_t)xoff_size) == 0) {
164 stream->xoff_sent = true;
166
167 /* If this is an entry conn, notify control port */
168 if (TO_CONN(stream)->type == CONN_TYPE_AP) {
170 STREAM_EVENT_XOFF_SENT,
171 0);
172 }
173 }
174}
175
176/**
177 * Compute the recent drain rate (write rate) for this edge
178 * connection and return it, in KB/sec (1000 bytes/sec).
179 *
180 * Returns 0 if the monotime clock is busted.
181 */
182static inline uint32_t
184{
185 if (BUG(!is_monotime_clock_reliable())) {
186 log_warn(LD_BUG, "Computing drain rate with stalled monotime clock");
187 return 0;
188 }
189
190 uint64_t delta = monotime_absolute_usec() - stream->drain_start_usec;
191
192 if (delta == 0) {
193 log_warn(LD_BUG, "Computing stream drain rate with zero time delta");
194 return 0;
195 }
196
197 /* Overflow checks */
198 if (stream->prev_drained_bytes > INT32_MAX/1000 || /* Intermediate */
199 stream->prev_drained_bytes/delta > INT32_MAX/1000) { /* full value */
200 return INT32_MAX;
201 }
202
203 /* kb/sec = bytes/usec * 1000 usec/msec * 1000 msec/sec * kb/1000bytes */
204 return MAX(1, (uint32_t)(stream->prev_drained_bytes * 1000)/delta);
205}
206
207/**
208 * Send an XON for this stream, with appropriate advisory rate information.
209 *
210 * Reverts the xoff sent status, and stores the rate information we sent,
211 * in case it changes.
212 */
213static void
215{
216 xon_cell_t xon;
217 uint8_t payload[CELL_PAYLOAD_SIZE];
218 ssize_t xon_size;
219
220 memset(&xon, 0, sizeof(xon));
221 memset(payload, 0, sizeof(payload));
222
223 xon_cell_set_version(&xon, 0);
224 xon_cell_set_kbps_ewma(&xon, stream->ewma_drain_rate);
225
226 if ((xon_size = xon_cell_encode(payload, CELL_PAYLOAD_SIZE, &xon)) < 0) {
227 log_warn(LD_BUG, "Failed to encode xon cell");
228 return;
229 }
230
231 /* Store the advisory rate information, to send advisory updates if
232 * it changes */
233 stream->ewma_rate_last_sent = stream->ewma_drain_rate;
234
235 if (connection_edge_send_command(stream, RELAY_COMMAND_XON, (char*)payload,
236 (size_t)xon_size) == 0) {
237 /* Revert the xoff sent status, so we can send another one if need be */
238 stream->xoff_sent = false;
239
240 cc_stats_flow_num_xon_sent++;
241
242 /* If it's an entry conn, notify control port */
243 if (TO_CONN(stream)->type == CONN_TYPE_AP) {
245 STREAM_EVENT_XON_SENT,
246 0);
247 }
248 }
249}
250
251/**
252 * Process a stream XOFF, parsing it, and then stopping reading on
253 * the edge connection.
254 *
255 * Record that we have received an xoff, so we know not to resume
256 * reading on this edge conn until we get an XON.
257 *
258 * Returns false if the XOFF did not validate; true if it does.
259 */
260bool
262 const crypt_path_t *layer_hint)
263{
264 bool retval = true;
265
266 if (BUG(!conn)) {
267 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
268 "Got XOFF on invalid stream?");
269 return false;
270 }
271
272 /* Make sure this XOFF came from the right hop */
273 if (!edge_uses_cpath(conn, layer_hint)) {
274 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
275 "Got XOFF from wrong hop.");
276 return false;
277 }
278
279 if (!edge_uses_flow_control(conn)) {
280 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
281 "Got XOFF for non-congestion control circuit");
282 return false;
283 }
284
285 if (conn->xoff_received) {
286 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
287 "Got multiple XOFF on connection");
288 return false;
289 }
290
291 /* If we are near the max, scale everything down */
292 if (conn->num_xoff_recv == XOFF_COUNT_SCALE_AT) {
293 log_info(LD_EDGE, "Scaling down for XOFF count: %d %d %d",
294 conn->total_bytes_xmit,
295 conn->num_xoff_recv,
296 conn->num_xon_recv);
297 conn->total_bytes_xmit /= 2;
298 conn->num_xoff_recv /= 2;
299 conn->num_xon_recv /= 2;
300 }
301
302 conn->num_xoff_recv++;
303
304 /* Client-side check to make sure that XOFF is not sent too early,
305 * for dropmark attacks. The main sidechannel risk is early cells,
306 * but we also check to make sure that we have not received more XOFFs
307 * than could have been generated by the bytes we sent.
308 */
309 if (TO_CONN(conn)->type == CONN_TYPE_AP || conn->hs_ident != NULL) {
310 uint32_t limit = 0;
311 if (conn->hs_ident)
312 limit = xoff_client;
313 else
314 limit = xoff_exit;
315
316 if (conn->total_bytes_xmit < limit*conn->num_xoff_recv) {
317 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
318 "Got extra XOFF for bytes sent. Got %d, expected max %d",
319 conn->num_xoff_recv, conn->total_bytes_xmit/limit);
320 /* We still process this, because the only dropmark defenses
321 * in C tor are via the vanguards addon's use of the read valid
322 * cells. So just signal that we think this is not valid protocol
323 * data and proceed. */
324 retval = false;
325 }
326 }
327
328 log_info(LD_EDGE, "Got XOFF!");
330 conn->xoff_received = true;
331
332 /* If this is an entry conn, notify control port */
333 if (TO_CONN(conn)->type == CONN_TYPE_AP) {
335 STREAM_EVENT_XOFF_RECV,
336 0);
337 }
338
339 return retval;
340}
341
342/**
343 * Process a stream XON, and if it validates, clear the xoff
344 * flag and resume reading on this edge connection.
345 *
346 * Also, use provided rate information to rate limit
347 * reading on this edge (or packagaing from it onto
348 * the circuit), to avoid XON/XOFF chatter.
349 *
350 * Returns true if the XON validates, false otherwise.
351 */
352bool
354 const crypt_path_t *layer_hint,
355 const relay_msg_t *msg)
356{
357 xon_cell_t *xon;
358 bool retval = true;
359
360 if (BUG(!conn)) {
361 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
362 "Got XON on invalid stream?");
363 return false;
364 }
365
366 /* Make sure this XON came from the right hop */
367 if (!edge_uses_cpath(conn, layer_hint)) {
368 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
369 "Got XON from wrong hop.");
370 return false;
371 }
372
373 if (!edge_uses_flow_control(conn)) {
374 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
375 "Got XON for non-congestion control circuit");
376 return false;
377 }
378
379 if (xon_cell_parse(&xon, msg->body, msg->length) < 0) {
380 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
381 "Received malformed XON cell.");
382 return false;
383 }
384
385 /* If we are near the max, scale everything down */
386 if (conn->num_xon_recv == XON_COUNT_SCALE_AT) {
387 log_info(LD_EDGE, "Scaling down for XON count: %d %d %d",
388 conn->total_bytes_xmit,
389 conn->num_xoff_recv,
390 conn->num_xon_recv);
391 conn->total_bytes_xmit /= 2;
392 conn->num_xoff_recv /= 2;
393 conn->num_xon_recv /= 2;
394 }
395
396 conn->num_xon_recv++;
397
398 /* Client-side check to make sure that XON is not sent too early,
399 * for dropmark attacks. The main sidechannel risk is early cells,
400 * but we also check to see that we did not get more XONs than make
401 * sense for the number of bytes we sent.
402 */
403 if (TO_CONN(conn)->type == CONN_TYPE_AP || conn->hs_ident != NULL) {
404 uint32_t limit = 0;
405
406 if (conn->hs_ident)
407 limit = MIN(xoff_client, xon_rate_bytes);
408 else
409 limit = MIN(xoff_exit, xon_rate_bytes);
410
411 if (conn->total_bytes_xmit < limit*conn->num_xon_recv) {
412 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
413 "Got extra XON for bytes sent. Got %d, expected max %d",
414 conn->num_xon_recv, conn->total_bytes_xmit/limit);
415
416 /* We still process this, because the only dropmark defenses
417 * in C tor are via the vanguards addon's use of the read valid
418 * cells. So just signal that we think this is not valid protocol
419 * data and proceed. */
420 retval = false;
421 }
422 }
423
424 log_info(LD_EDGE, "Got XON: %d", xon->kbps_ewma);
425
426 /* Adjust the token bucket of this edge connection with the drain rate in
427 * the XON. Rate is in bytes from kilobit (kpbs). */
428 uint64_t rate = ((uint64_t) xon_cell_get_kbps_ewma(xon) * 1000);
429 if (rate == 0 || INT32_MAX < rate) {
430 /* No rate. */
431 rate = INT32_MAX;
432 }
433 token_bucket_rw_adjust(&conn->bucket, (uint32_t) rate, (uint32_t) rate);
434
435 if (conn->xoff_received) {
436 /* Clear the fact that we got an XOFF, so that this edge can
437 * start and stop reading normally */
438 conn->xoff_received = false;
440 }
441
442 /* If this is an entry conn, notify control port */
443 if (TO_CONN(conn)->type == CONN_TYPE_AP) {
445 STREAM_EVENT_XON_RECV,
446 0);
447 }
448
449 xon_cell_free(xon);
450
451 return retval;
452}
453
454/**
455 * Called from sendme_stream_data_received(), when data arrives
456 * from a circuit to our edge's outbuf, to decide if we need to send
457 * an XOFF.
458 *
459 * Returns the amount of cells remaining until the buffer is full, at
460 * which point it sends an XOFF, and returns 0.
461 *
462 * Returns less than 0 if we have queued more than a congestion window
463 * worth of data and need to close the circuit.
464 */
465int
467{
468 size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream));
469 uint32_t buffer_limit_xoff = 0;
470
471 if (BUG(!edge_uses_flow_control(stream))) {
472 log_err(LD_BUG, "Flow control called for non-congestion control circuit");
473 return -1;
474 }
475
476 /* Onion services and clients are typically localhost edges, so they
477 * need different buffering limits than exits do */
478 if (TO_CONN(stream)->type == CONN_TYPE_AP || stream->hs_ident != NULL) {
479 buffer_limit_xoff = xoff_client;
480 } else {
481 buffer_limit_xoff = xoff_exit;
482 }
483
484 if (total_buffered > buffer_limit_xoff) {
485 if (!stream->xoff_sent) {
486 uint64_t now = monotime_absolute_usec();
487
488 if (stream->xoff_grace_period_start_usec == 0) {
489 /* If unset, we haven't begun the XOFF grace period. We need to start.
490 */
491 log_debug(LD_EDGE,
492 "Exceeded XOFF limit; Beginning grace period: "
493 "total-buffered=%" TOR_PRIuSZ " xoff-limit=%d",
494 total_buffered, buffer_limit_xoff);
495
496 stream->xoff_grace_period_start_usec = now;
497 } else if (now > stream->xoff_grace_period_start_usec +
498 XOFF_GRACE_PERIOD_USEC) {
499 /* If we've exceeded our XOFF grace period, we need to send an XOFF. */
500 log_info(LD_EDGE,
501 "Sending XOFF: total-buffered=%" TOR_PRIuSZ
502 " xoff-limit=%d grace-period-dur=%" PRIu64 "usec",
503 total_buffered, buffer_limit_xoff,
504 now - stream->xoff_grace_period_start_usec);
505 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xoff_sending), stream);
506
507 cc_stats_flow_xoff_outbuf_ma =
508 stats_update_running_avg(cc_stats_flow_xoff_outbuf_ma,
509 total_buffered);
510
512
513 /* Clear the drain rate. It is considered wrong if we
514 * got all the way to XOFF */
515 stream->ewma_drain_rate = 0;
516
517 /* Unset our grace period. */
519 } else {
520 /* Else we're in the XOFF grace period, so don't do anything. */
521 }
522 }
523 } else {
524 /* The outbuf length is less than the XOFF limit, so unset our grace
525 * period. */
527 }
528
529 /* If the outbuf has accumulated more than the expected burst limit of
530 * cells, then assume it is not draining, and call decide_xon. We must
531 * do this because writes only happen when the socket unblocks, so
532 * may not otherwise notice accumulation of data in the outbuf for
533 * advisory XONs. */
534 if (total_buffered > MAX_EXPECTED_CELL_BURST*RELAY_PAYLOAD_SIZE_MIN) {
535 flow_control_decide_xon(stream, 0);
536 }
537
538 /* Flow control always takes more data; we rely on the oomkiller to
539 * handle misbehavior. */
540 return 0;
541}
542
543/**
544 * Returns true if the stream's drain rate has changed significantly.
545 *
546 * Returns false if the monotime clock is stalled, or if we have
547 * no previous drain rate information.
548 */
549static bool
551{
553 return false;
554 }
555
556 if (!stream->ewma_rate_last_sent) {
557 return false;
558 }
559
560 if (stream->ewma_drain_rate >
561 (100+(uint64_t)xon_change_pct)*stream->ewma_rate_last_sent/100) {
562 return true;
563 }
564
565 if (stream->ewma_drain_rate <
566 (100-(uint64_t)xon_change_pct)*stream->ewma_rate_last_sent/100) {
567 return true;
568 }
569
570 return false;
571}
572
573/**
574 * Called whenever we drain an edge connection outbuf by writing on
575 * its socket, to decide if it is time to send an xon.
576 *
577 * The n_written parameter tells us how many bytes we have written
578 * this time, which is used to compute the advisory drain rate fields.
579 */
580void
582{
583 size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream));
584
585 /* Bounds check the number of drained bytes, and scale */
586 if (stream->drained_bytes >= UINT32_MAX - n_written) {
587 /* Cut the bytes in half, and move the start time up halfway to now
588 * (if we have one). */
589 stream->drained_bytes /= 2;
590
591 if (stream->drain_start_usec) {
592 uint64_t now = monotime_absolute_usec();
593
594 stream->drain_start_usec = now - (now-stream->drain_start_usec)/2;
595 }
596 }
597
598 /* Accumulate drained bytes since last rate computation */
599 stream->drained_bytes += n_written;
600
601 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon), stream, n_written);
602
603 /* Check for bad monotime clock and bytecount wrap */
605 /* If the monotime clock ever goes wrong, the safest thing to do
606 * is just clear our short-term rate info and wait for the clock to
607 * become reliable again.. */
608 stream->drain_start_usec = 0;
609 stream->drained_bytes = 0;
610 } else {
611 /* If we have no drain start timestamp, and we still have
612 * remaining buffer, start the buffering counter */
613 if (!stream->drain_start_usec && total_buffered > 0) {
614 log_debug(LD_EDGE, "Began edge buffering: %d %d %"TOR_PRIuSZ,
615 stream->ewma_rate_last_sent,
616 stream->ewma_drain_rate,
617 total_buffered);
618 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_start),
619 stream);
621 stream->drained_bytes = 0;
622 }
623 }
624
625 if (stream->drain_start_usec) {
626 /* If we have spent enough time in a queued state, update our drain
627 * rate. */
628 if (stream->drained_bytes > xon_rate_bytes) {
629 /* No previous drained bytes means it is the first time we are computing
630 * it so use the value we just drained onto the socket as a baseline. It
631 * won't be accurate but it will be a start towards the right value.
632 *
633 * We have to do this in order to have a drain rate else we could be
634 * sending a drain rate of 0 in an XON which would be undesirable and
635 * basically like sending an XOFF. */
636 if (stream->prev_drained_bytes == 0) {
637 stream->prev_drained_bytes = stream->drained_bytes;
638 }
639 uint32_t drain_rate = compute_drain_rate(stream);
640 /* Once the drain rate has been computed, note how many bytes we just
641 * drained so it can be used at the next calculation. We do this here
642 * because it gets reset once the rate is changed. */
643 stream->prev_drained_bytes = stream->drained_bytes;
644
645 if (drain_rate) {
646 stream->ewma_drain_rate =
647 (uint32_t)n_count_ewma(drain_rate,
648 stream->ewma_drain_rate,
649 xon_ewma_cnt);
650 log_debug(LD_EDGE, "Updating drain rate: %d %d %"TOR_PRIuSZ,
651 drain_rate,
652 stream->ewma_drain_rate,
653 total_buffered);
654 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_update),
655 stream, drain_rate);
656 /* Reset recent byte counts. This prevents us from sending advisory
657 * XONs more frequent than every xon_rate_bytes. */
658 stream->drained_bytes = 0;
659 stream->drain_start_usec = 0;
660 }
661 }
662 }
663
664 /* If we don't have an XOFF outstanding, consider updating an
665 * old rate */
666 if (!stream->xoff_sent) {
667 if (stream_drain_rate_changed(stream)) {
668 /* If we are still buffering and the rate changed, update
669 * advisory XON */
670 log_info(LD_EDGE, "Sending rate-change XON: %d %d %"TOR_PRIuSZ,
671 stream->ewma_rate_last_sent,
672 stream->ewma_drain_rate,
673 total_buffered);
674 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_rate_change), stream);
675
676 cc_stats_flow_xon_outbuf_ma =
677 stats_update_running_avg(cc_stats_flow_xon_outbuf_ma,
678 total_buffered);
679
681 }
682 } else if (total_buffered == 0) {
683 log_info(LD_EDGE, "Sending XON: %d %d %"TOR_PRIuSZ,
684 stream->ewma_rate_last_sent,
685 stream->ewma_drain_rate,
686 total_buffered);
687 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_partial_drain), stream);
689 }
690
691 /* If the buffer has fully emptied, clear the drain timestamp,
692 * so we can total only bytes drained while outbuf is 0. */
693 if (total_buffered == 0) {
694 stream->drain_start_usec = 0;
695
696 /* After we've spent 'xon_rate_bytes' with the queue fully drained,
697 * double any rate we sent. */
698 if (stream->drained_bytes >= xon_rate_bytes &&
699 stream->ewma_rate_last_sent) {
700 stream->ewma_drain_rate = MIN(INT32_MAX, 2*stream->ewma_drain_rate);
701
702 log_debug(LD_EDGE,
703 "Queue empty for xon_rate_limit bytes: %d %d",
704 stream->ewma_rate_last_sent,
705 stream->ewma_drain_rate);
706 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_doubled), stream);
707 /* Resetting the drained bytes count. We need to keep its value as a
708 * previous so the drain rate calculation takes into account what was
709 * actually drain the last time. */
710 stream->prev_drained_bytes = stream->drained_bytes;
711 stream->drained_bytes = 0;
712 }
713 }
714
715 return;
716}
717
718/**
719 * Note that we packaged some data on this stream. Used to enforce
720 * client-side dropmark limits
721 */
722void
724{
725 /* If we are near the max, scale everything down */
726 if (stream->total_bytes_xmit >= TOTAL_XMIT_SCALE_AT-len) {
727 log_info(LD_EDGE, "Scaling down for flow control xmit bytes:: %d %d %d",
728 stream->total_bytes_xmit,
729 stream->num_xoff_recv,
730 stream->num_xon_recv);
731
732 stream->total_bytes_xmit /= 2;
733 stream->num_xoff_recv /= 2;
734 stream->num_xon_recv /= 2;
735 }
736
737 stream->total_bytes_xmit += len;
738}
739
740/** Returns true if an edge connection uses flow control */
741bool
743{
744 bool ret = (stream->on_circuit && stream->on_circuit->ccontrol) ||
745 (stream->cpath_layer && stream->cpath_layer->ccontrol);
746
747 /* All circuits with congestion control use flow control */
748 return ret;
749}
750
751/** Returns true if a connection is an edge conn that uses flow control */
752bool
754{
755 bool ret = false;
756
757 if (CONN_IS_EDGE(conn)) {
758 edge_connection_t *edge = TO_EDGE_CONN(conn);
759
760 if (edge_uses_flow_control(edge)) {
761 ret = true;
762 }
763 }
764
765 return ret;
766}
Fixed-size cell structure.
Header file for circuitlist.c.
uint64_t monotime_absolute_usec(void)
Header file for config.c.
bool edge_uses_cpath(const edge_connection_t *conn, const crypt_path_t *cpath)
Header file for conflux_util.c.
bool is_monotime_clock_reliable(void)
Public APIs for congestion control.
static uint64_t n_count_ewma(uint64_t curr, uint64_t prev, uint64_t N)
bool circuit_process_stream_xoff(edge_connection_t *conn, const crypt_path_t *layer_hint)
static uint32_t compute_drain_rate(const edge_connection_t *stream)
int flow_control_decide_xoff(edge_connection_t *stream)
void flow_control_note_sent_data(edge_connection_t *stream, size_t len)
static bool stream_drain_rate_changed(const edge_connection_t *stream)
bool conn_uses_flow_control(connection_t *conn)
void flow_control_new_consensus_params(const networkstatus_t *ns)
bool edge_uses_flow_control(const edge_connection_t *stream)
uint64_t cc_stats_flow_num_xoff_sent
bool circuit_process_stream_xon(edge_connection_t *conn, const crypt_path_t *layer_hint, const relay_msg_t *msg)
static void circuit_send_stream_xon(edge_connection_t *stream)
void flow_control_decide_xon(edge_connection_t *stream, size_t n_written)
static void circuit_send_stream_xoff(edge_connection_t *stream)
static uint32_t xoff_client
APIs for stream flow control on congestion controlled circuits.
Structure definitions for congestion control.
Header file for connection.c.
#define CONN_TYPE_AP
Definition connection.h:51
entry_connection_t * TO_ENTRY_CONN(connection_t *c)
edge_connection_t * TO_EDGE_CONN(connection_t *c)
Header file for connection_edge.c.
Base connection structure.
#define CONN_IS_EDGE(x)
int control_event_stream_status(entry_connection_t *conn, stream_status_event_t tp, int reason_code)
Header file for control_events.c.
#define TR_SUBSYS(name)
Definition events.h:45
#define log_fn(severity, domain, args,...)
Definition log.h:283
#define LD_EDGE
Definition log.h:94
#define LD_BUG
Definition log.h:86
void connection_stop_reading(connection_t *conn)
Definition mainloop.c:601
void connection_start_reading(connection_t *conn)
Definition mainloop.c:623
Header file for mainloop.c.
int32_t networkstatus_get_param(const networkstatus_t *ns, const char *param_name, int32_t default_val, int32_t min_val, int32_t max_val)
Header file for networkstatus.c.
Master header file for Tor-specific functionality.
#define CELL_PAYLOAD_SIZE
Definition or.h:529
#define RELAY_PAYLOAD_SIZE_MIN
Definition or.h:579
#define TO_CONN(c)
Definition or.h:709
#define RELAY_PAYLOAD_SIZE_MAX
Definition or.h:576
int connection_edge_send_command(edge_connection_t *fromconn, uint8_t relay_command, const char *payload, size_t payload_len)
Definition relay.c:764
Header file for relay.c.
Header for stats.c.
struct congestion_control_t * ccontrol
Definition circuit_st.h:260
struct congestion_control_t * ccontrol
struct crypt_path_t * cpath_layer
token_bucket_rw_t bucket
uint64_t xoff_grace_period_start_usec
struct circuit_t * on_circuit
void token_bucket_rw_adjust(token_bucket_rw_t *bucket, uint32_t rate, uint32_t burst)