10#define TOR_CONGESTION_CONTROL_FLOW_PRIVATE
22#include "core/or/trace_probes_cc.h"
24#include "trunnel/flow_control_cells.h"
35static uint32_t xoff_exit;
37static uint32_t xon_change_pct;
38static uint32_t xon_ewma_cnt;
39static uint32_t xon_rate_bytes;
43uint64_t cc_stats_flow_num_xon_sent;
44double cc_stats_flow_xoff_outbuf_ma = 0;
45double cc_stats_flow_xon_outbuf_ma = 0;
50#define MAX_EXPECTED_CELL_BURST 32
59#define XON_COUNT_SCALE_AT 200
60#define XOFF_COUNT_SCALE_AT 200
61#define ONE_MEGABYTE (UINT64_C(1) << 20)
62#define TOTAL_XMIT_SCALE_AT (10 * ONE_MEGABYTE)
74#define CC_XOFF_CLIENT_DFLT 500
75#define CC_XOFF_CLIENT_MIN 1
76#define CC_XOFF_CLIENT_MAX 10000
82#define CC_XOFF_EXIT_DFLT 500
83#define CC_XOFF_EXIT_MIN 1
84#define CC_XOFF_EXIT_MAX 10000
90#define CC_XON_CHANGE_PCT_DFLT 25
91#define CC_XON_CHANGE_PCT_MIN 1
92#define CC_XON_CHANGE_PCT_MAX 99
94 CC_XON_CHANGE_PCT_DFLT,
95 CC_XON_CHANGE_PCT_MIN,
96 CC_XON_CHANGE_PCT_MAX);
98#define CC_XON_RATE_BYTES_DFLT (500)
99#define CC_XON_RATE_BYTES_MIN (1)
100#define CC_XON_RATE_BYTES_MAX (5000)
102 CC_XON_RATE_BYTES_DFLT,
103 CC_XON_RATE_BYTES_MIN,
106#define CC_XON_EWMA_CNT_DFLT (2)
107#define CC_XON_EWMA_CNT_MIN (2)
108#define CC_XON_EWMA_CNT_MAX (100)
110 CC_XON_EWMA_CNT_DFLT,
112 CC_XON_EWMA_CNT_MAX);
125 memset(&xoff, 0,
sizeof(xoff));
126 memset(payload, 0,
sizeof(payload));
128 xoff_cell_set_version(&xoff, 0);
131 log_warn(
LD_BUG,
"Failed to encode xon cell");
136 (
char*)payload, (
size_t)xoff_size) == 0) {
143 STREAM_EVENT_XOFF_SENT,
155static inline uint32_t
159 log_warn(
LD_BUG,
"Computing drain rate with stalled monotime clock");
166 log_warn(
LD_BUG,
"Computing stream drain rate with zero time delta");
171 if (stream->prev_drained_bytes > INT32_MAX/1000 ||
172 stream->prev_drained_bytes/delta > INT32_MAX/1000) {
177 return MAX(1, (uint32_t)(stream->prev_drained_bytes * 1000)/delta);
193 memset(&xon, 0,
sizeof(xon));
194 memset(payload, 0,
sizeof(payload));
196 xon_cell_set_version(&xon, 0);
200 log_warn(
LD_BUG,
"Failed to encode xon cell");
209 (
size_t)xon_size) == 0) {
213 cc_stats_flow_num_xon_sent++;
218 STREAM_EVENT_XON_SENT,
243 "Got XOFF on invalid stream?");
250 "Got XOFF from wrong hop.");
256 "Got XOFF for non-congestion control circuit");
262 "Got multiple XOFF on connection");
268 log_info(
LD_EDGE,
"Scaling down for XOFF count: %d %d %d",
293 "Got extra XOFF for bytes sent. Got %d, expected max %d",
303 log_info(
LD_EDGE,
"Got XOFF!");
310 STREAM_EVENT_XOFF_RECV,
337 "Got XON on invalid stream?");
344 "Got XON from wrong hop.");
350 "Got XON for non-congestion control circuit");
357 "Received malformed XON cell.");
363 log_info(
LD_EDGE,
"Scaling down for XON count: %d %d %d",
385 limit = MIN(xoff_exit, xon_rate_bytes);
389 "Got extra XON for bytes sent. Got %d, expected max %d",
400 log_info(
LD_EDGE,
"Got XON: %d", xon->kbps_ewma);
404 uint64_t rate = ((uint64_t) xon_cell_get_kbps_ewma(xon) * 1000);
405 if (rate == 0 || INT32_MAX < rate) {
421 STREAM_EVENT_XON_RECV,
444 size_t total_buffered = connection_get_outbuf_len(
TO_CONN(stream));
445 uint32_t buffer_limit_xoff = 0;
448 log_err(
LD_BUG,
"Flow control called for non-congestion control circuit");
457 buffer_limit_xoff = xoff_exit;
460 if (total_buffered > buffer_limit_xoff) {
462 log_info(
LD_EDGE,
"Sending XOFF: %"TOR_PRIuSZ
" %d",
463 total_buffered, buffer_limit_xoff);
464 tor_trace(
TR_SUBSYS(cc), TR_EV(flow_decide_xoff_sending), stream);
466 cc_stats_flow_xoff_outbuf_ma =
467 stats_update_running_avg(cc_stats_flow_xoff_outbuf_ma,
532 size_t total_buffered = connection_get_outbuf_len(
TO_CONN(stream));
550 tor_trace(
TR_SUBSYS(cc), TR_EV(flow_decide_xon), stream, n_written);
563 log_debug(
LD_EDGE,
"Began edge buffering: %d %d %"TOR_PRIuSZ,
567 tor_trace(
TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_start),
585 if (stream->prev_drained_bytes == 0) {
599 log_debug(
LD_EDGE,
"Updating drain rate: %d %d %"TOR_PRIuSZ,
603 tor_trace(
TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_update),
619 log_info(
LD_EDGE,
"Sending rate-change XON: %d %d %"TOR_PRIuSZ,
623 tor_trace(
TR_SUBSYS(cc), TR_EV(flow_decide_xon_rate_change), stream);
625 cc_stats_flow_xon_outbuf_ma =
626 stats_update_running_avg(cc_stats_flow_xon_outbuf_ma,
631 }
else if (total_buffered == 0) {
632 log_info(
LD_EDGE,
"Sending XON: %d %d %"TOR_PRIuSZ,
636 tor_trace(
TR_SUBSYS(cc), TR_EV(flow_decide_xon_partial_drain), stream);
642 if (total_buffered == 0) {
652 "Queue empty for xon_rate_limit bytes: %d %d",
655 tor_trace(
TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_doubled), stream);
676 log_info(
LD_EDGE,
"Scaling down for flow control xmit bytes:: %d %d %d",
Fixed-size cell structure.
Header file for circuitlist.c.
uint64_t monotime_absolute_usec(void)
Header file for config.c.
bool edge_uses_cpath(const edge_connection_t *conn, const crypt_path_t *cpath)
Header file for conflux_util.c.
bool is_monotime_clock_reliable(void)
Public APIs for congestion control.
static uint64_t n_count_ewma(uint64_t curr, uint64_t prev, uint64_t N)
bool circuit_process_stream_xoff(edge_connection_t *conn, const crypt_path_t *layer_hint, const cell_t *cell)
static uint32_t compute_drain_rate(const edge_connection_t *stream)
int flow_control_decide_xoff(edge_connection_t *stream)
void flow_control_note_sent_data(edge_connection_t *stream, size_t len)
static bool stream_drain_rate_changed(const edge_connection_t *stream)
bool conn_uses_flow_control(connection_t *conn)
void flow_control_new_consensus_params(const networkstatus_t *ns)
bool edge_uses_flow_control(const edge_connection_t *stream)
uint64_t cc_stats_flow_num_xoff_sent
static void circuit_send_stream_xon(edge_connection_t *stream)
bool circuit_process_stream_xon(edge_connection_t *conn, const crypt_path_t *layer_hint, const cell_t *cell)
void flow_control_decide_xon(edge_connection_t *stream, size_t n_written)
static void circuit_send_stream_xoff(edge_connection_t *stream)
static uint32_t xoff_client
APIs for stream flow control on congestion controlled circuits.
Structure definitions for congestion control.
Header file for connection.c.
entry_connection_t * TO_ENTRY_CONN(connection_t *c)
edge_connection_t * TO_EDGE_CONN(connection_t *c)
Header file for connection_edge.c.
Base connection structure.
int control_event_stream_status(entry_connection_t *conn, stream_status_event_t tp, int reason_code)
Header file for control_events.c.
#define log_fn(severity, domain, args,...)
void connection_stop_reading(connection_t *conn)
void connection_start_reading(connection_t *conn)
Header file for mainloop.c.
int32_t networkstatus_get_param(const networkstatus_t *ns, const char *param_name, int32_t default_val, int32_t min_val, int32_t max_val)
Header file for networkstatus.c.
Master header file for Tor-specific functionality.
#define CELL_PAYLOAD_SIZE
#define RELAY_PAYLOAD_SIZE
#define RELAY_HEADER_SIZE
int connection_edge_send_command(edge_connection_t *fromconn, uint8_t relay_command, const char *payload, size_t payload_len)
uint8_t payload[CELL_PAYLOAD_SIZE]
struct congestion_control_t * ccontrol
struct congestion_control_t * ccontrol
struct crypt_path_t * cpath_layer
uint64_t drain_start_usec
uint32_t total_bytes_xmit
struct circuit_t * on_circuit
uint32_t ewma_rate_last_sent
void token_bucket_rw_adjust(token_bucket_rw_t *bucket, uint32_t rate, uint32_t burst)