10#define TOR_CONGESTION_CONTROL_FLOW_PRIVATE
22#include "core/or/trace_probes_cc.h"
24#include "trunnel/flow_control_cells.h"
35static uint32_t xoff_exit;
37static uint32_t xon_change_pct;
38static uint32_t xon_ewma_cnt;
39static uint32_t xon_rate_bytes;
43uint64_t cc_stats_flow_num_xon_sent;
44double cc_stats_flow_xoff_outbuf_ma = 0;
45double cc_stats_flow_xon_outbuf_ma = 0;
50#define MAX_EXPECTED_CELL_BURST 32
77#define XOFF_GRACE_PERIOD_USEC (5000)
86#define XON_COUNT_SCALE_AT 200
87#define XOFF_COUNT_SCALE_AT 200
88#define ONE_MEGABYTE (UINT64_C(1) << 20)
89#define TOTAL_XMIT_SCALE_AT (10 * ONE_MEGABYTE)
101#define CC_XOFF_CLIENT_DFLT 500
102#define CC_XOFF_CLIENT_MIN 1
103#define CC_XOFF_CLIENT_MAX 10000
109#define CC_XOFF_EXIT_DFLT 500
110#define CC_XOFF_EXIT_MIN 1
111#define CC_XOFF_EXIT_MAX 10000
117#define CC_XON_CHANGE_PCT_DFLT 25
118#define CC_XON_CHANGE_PCT_MIN 1
119#define CC_XON_CHANGE_PCT_MAX 99
121 CC_XON_CHANGE_PCT_DFLT,
122 CC_XON_CHANGE_PCT_MIN,
123 CC_XON_CHANGE_PCT_MAX);
125#define CC_XON_RATE_BYTES_DFLT (500)
126#define CC_XON_RATE_BYTES_MIN (1)
127#define CC_XON_RATE_BYTES_MAX (5000)
129 CC_XON_RATE_BYTES_DFLT,
130 CC_XON_RATE_BYTES_MIN,
133#define CC_XON_EWMA_CNT_DFLT (2)
134#define CC_XON_EWMA_CNT_MIN (2)
135#define CC_XON_EWMA_CNT_MAX (100)
137 CC_XON_EWMA_CNT_DFLT,
139 CC_XON_EWMA_CNT_MAX);
152 memset(&xoff, 0,
sizeof(xoff));
153 memset(payload, 0,
sizeof(payload));
155 xoff_cell_set_version(&xoff, 0);
158 log_warn(
LD_BUG,
"Failed to encode xon cell");
163 (
char*)payload, (
size_t)xoff_size) == 0) {
170 STREAM_EVENT_XOFF_SENT,
182static inline uint32_t
186 log_warn(
LD_BUG,
"Computing drain rate with stalled monotime clock");
193 log_warn(
LD_BUG,
"Computing stream drain rate with zero time delta");
198 if (stream->prev_drained_bytes > INT32_MAX/1000 ||
199 stream->prev_drained_bytes/delta > INT32_MAX/1000) {
204 return MAX(1, (uint32_t)(stream->prev_drained_bytes * 1000)/delta);
220 memset(&xon, 0,
sizeof(xon));
221 memset(payload, 0,
sizeof(payload));
223 xon_cell_set_version(&xon, 0);
227 log_warn(
LD_BUG,
"Failed to encode xon cell");
236 (
size_t)xon_size) == 0) {
240 cc_stats_flow_num_xon_sent++;
245 STREAM_EVENT_XON_SENT,
268 "Got XOFF on invalid stream?");
275 "Got XOFF from wrong hop.");
281 "Got XOFF for non-congestion control circuit");
287 "Got multiple XOFF on connection");
293 log_info(
LD_EDGE,
"Scaling down for XOFF count: %d %d %d",
318 "Got extra XOFF for bytes sent. Got %d, expected max %d",
328 log_info(
LD_EDGE,
"Got XOFF!");
335 STREAM_EVENT_XOFF_RECV,
362 "Got XON on invalid stream?");
369 "Got XON from wrong hop.");
375 "Got XON for non-congestion control circuit");
379 if (xon_cell_parse(&xon, msg->body, msg->length) < 0) {
381 "Received malformed XON cell.");
387 log_info(
LD_EDGE,
"Scaling down for XON count: %d %d %d",
409 limit = MIN(xoff_exit, xon_rate_bytes);
413 "Got extra XON for bytes sent. Got %d, expected max %d",
424 log_info(
LD_EDGE,
"Got XON: %d", xon->kbps_ewma);
428 uint64_t rate = ((uint64_t) xon_cell_get_kbps_ewma(xon) * 1000);
429 if (rate == 0 || INT32_MAX < rate) {
445 STREAM_EVENT_XON_RECV,
468 size_t total_buffered = connection_get_outbuf_len(
TO_CONN(stream));
469 uint32_t buffer_limit_xoff = 0;
472 log_err(
LD_BUG,
"Flow control called for non-congestion control circuit");
481 buffer_limit_xoff = xoff_exit;
484 if (total_buffered > buffer_limit_xoff) {
492 "Exceeded XOFF limit; Beginning grace period: "
493 "total-buffered=%" TOR_PRIuSZ
" xoff-limit=%d",
494 total_buffered, buffer_limit_xoff);
498 XOFF_GRACE_PERIOD_USEC) {
501 "Sending XOFF: total-buffered=%" TOR_PRIuSZ
502 " xoff-limit=%d grace-period-dur=%" PRIu64
"usec",
503 total_buffered, buffer_limit_xoff,
505 tor_trace(
TR_SUBSYS(cc), TR_EV(flow_decide_xoff_sending), stream);
507 cc_stats_flow_xoff_outbuf_ma =
508 stats_update_running_avg(cc_stats_flow_xoff_outbuf_ma,
583 size_t total_buffered = connection_get_outbuf_len(
TO_CONN(stream));
601 tor_trace(
TR_SUBSYS(cc), TR_EV(flow_decide_xon), stream, n_written);
614 log_debug(
LD_EDGE,
"Began edge buffering: %d %d %"TOR_PRIuSZ,
618 tor_trace(
TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_start),
636 if (stream->prev_drained_bytes == 0) {
650 log_debug(
LD_EDGE,
"Updating drain rate: %d %d %"TOR_PRIuSZ,
654 tor_trace(
TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_update),
670 log_info(
LD_EDGE,
"Sending rate-change XON: %d %d %"TOR_PRIuSZ,
674 tor_trace(
TR_SUBSYS(cc), TR_EV(flow_decide_xon_rate_change), stream);
676 cc_stats_flow_xon_outbuf_ma =
677 stats_update_running_avg(cc_stats_flow_xon_outbuf_ma,
682 }
else if (total_buffered == 0) {
683 log_info(
LD_EDGE,
"Sending XON: %d %d %"TOR_PRIuSZ,
687 tor_trace(
TR_SUBSYS(cc), TR_EV(flow_decide_xon_partial_drain), stream);
693 if (total_buffered == 0) {
703 "Queue empty for xon_rate_limit bytes: %d %d",
706 tor_trace(
TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_doubled), stream);
727 log_info(
LD_EDGE,
"Scaling down for flow control xmit bytes:: %d %d %d",
Fixed-size cell structure.
Header file for circuitlist.c.
uint64_t monotime_absolute_usec(void)
Header file for config.c.
bool edge_uses_cpath(const edge_connection_t *conn, const crypt_path_t *cpath)
Header file for conflux_util.c.
bool is_monotime_clock_reliable(void)
Public APIs for congestion control.
static uint64_t n_count_ewma(uint64_t curr, uint64_t prev, uint64_t N)
bool circuit_process_stream_xoff(edge_connection_t *conn, const crypt_path_t *layer_hint)
static uint32_t compute_drain_rate(const edge_connection_t *stream)
int flow_control_decide_xoff(edge_connection_t *stream)
void flow_control_note_sent_data(edge_connection_t *stream, size_t len)
static bool stream_drain_rate_changed(const edge_connection_t *stream)
bool conn_uses_flow_control(connection_t *conn)
void flow_control_new_consensus_params(const networkstatus_t *ns)
bool edge_uses_flow_control(const edge_connection_t *stream)
uint64_t cc_stats_flow_num_xoff_sent
bool circuit_process_stream_xon(edge_connection_t *conn, const crypt_path_t *layer_hint, const relay_msg_t *msg)
static void circuit_send_stream_xon(edge_connection_t *stream)
void flow_control_decide_xon(edge_connection_t *stream, size_t n_written)
static void circuit_send_stream_xoff(edge_connection_t *stream)
static uint32_t xoff_client
APIs for stream flow control on congestion controlled circuits.
Structure definitions for congestion control.
Header file for connection.c.
entry_connection_t * TO_ENTRY_CONN(connection_t *c)
edge_connection_t * TO_EDGE_CONN(connection_t *c)
Header file for connection_edge.c.
Base connection structure.
int control_event_stream_status(entry_connection_t *conn, stream_status_event_t tp, int reason_code)
Header file for control_events.c.
#define log_fn(severity, domain, args,...)
void connection_stop_reading(connection_t *conn)
void connection_start_reading(connection_t *conn)
Header file for mainloop.c.
int32_t networkstatus_get_param(const networkstatus_t *ns, const char *param_name, int32_t default_val, int32_t min_val, int32_t max_val)
Header file for networkstatus.c.
Master header file for Tor-specific functionality.
#define CELL_PAYLOAD_SIZE
#define RELAY_PAYLOAD_SIZE_MIN
#define RELAY_PAYLOAD_SIZE_MAX
int connection_edge_send_command(edge_connection_t *fromconn, uint8_t relay_command, const char *payload, size_t payload_len)
struct congestion_control_t * ccontrol
struct congestion_control_t * ccontrol
struct crypt_path_t * cpath_layer
uint64_t drain_start_usec
uint32_t total_bytes_xmit
uint64_t xoff_grace_period_start_usec
struct circuit_t * on_circuit
uint32_t ewma_rate_last_sent
void token_bucket_rw_adjust(token_bucket_rw_t *bucket, uint32_t rate, uint32_t burst)