10#define TOR_CONGESTION_CONTROL_FLOW_PRIVATE
22#include "core/or/trace_probes_cc.h"
24#include "trunnel/flow_control_cells.h"
35static uint32_t xoff_exit;
37static uint32_t xon_change_pct;
38static uint32_t xon_ewma_cnt;
39static uint32_t xon_rate_bytes;
43uint64_t cc_stats_flow_num_xon_sent;
44double cc_stats_flow_xoff_outbuf_ma = 0;
45double cc_stats_flow_xon_outbuf_ma = 0;
50#define MAX_EXPECTED_CELL_BURST 32
59#define XON_COUNT_SCALE_AT 200
60#define XOFF_COUNT_SCALE_AT 200
61#define ONE_MEGABYTE (UINT64_C(1) << 20)
62#define TOTAL_XMIT_SCALE_AT (10 * ONE_MEGABYTE)
74#define CC_XOFF_CLIENT_DFLT 500
75#define CC_XOFF_CLIENT_MIN 1
76#define CC_XOFF_CLIENT_MAX 10000
82#define CC_XOFF_EXIT_DFLT 500
83#define CC_XOFF_EXIT_MIN 1
84#define CC_XOFF_EXIT_MAX 10000
90#define CC_XON_CHANGE_PCT_DFLT 25
91#define CC_XON_CHANGE_PCT_MIN 1
92#define CC_XON_CHANGE_PCT_MAX 99
94 CC_XON_CHANGE_PCT_DFLT,
95 CC_XON_CHANGE_PCT_MIN,
96 CC_XON_CHANGE_PCT_MAX);
98#define CC_XON_RATE_BYTES_DFLT (500)
99#define CC_XON_RATE_BYTES_MIN (1)
100#define CC_XON_RATE_BYTES_MAX (5000)
102 CC_XON_RATE_BYTES_DFLT,
103 CC_XON_RATE_BYTES_MIN,
106#define CC_XON_EWMA_CNT_DFLT (2)
107#define CC_XON_EWMA_CNT_MIN (2)
108#define CC_XON_EWMA_CNT_MAX (100)
110 CC_XON_EWMA_CNT_DFLT,
112 CC_XON_EWMA_CNT_MAX);
125 memset(&xoff, 0,
sizeof(xoff));
126 memset(payload, 0,
sizeof(payload));
128 xoff_cell_set_version(&xoff, 0);
131 log_warn(
LD_BUG,
"Failed to encode xon cell");
136 (
char*)payload, (
size_t)xoff_size) == 0) {
143 STREAM_EVENT_XOFF_SENT,
155static inline uint32_t
159 log_warn(
LD_BUG,
"Computing drain rate with stalled monotime clock");
166 log_warn(
LD_BUG,
"Computing stream drain rate with zero time delta");
171 if (stream->prev_drained_bytes > INT32_MAX/1000 ||
172 stream->prev_drained_bytes/delta > INT32_MAX/1000) {
177 return MAX(1, (uint32_t)(stream->prev_drained_bytes * 1000)/delta);
193 memset(&xon, 0,
sizeof(xon));
194 memset(payload, 0,
sizeof(payload));
196 xon_cell_set_version(&xon, 0);
200 log_warn(
LD_BUG,
"Failed to encode xon cell");
209 (
size_t)xon_size) == 0) {
213 cc_stats_flow_num_xon_sent++;
218 STREAM_EVENT_XON_SENT,
241 "Got XOFF on invalid stream?");
248 "Got XOFF from wrong hop.");
254 "Got XOFF for non-congestion control circuit");
260 "Got multiple XOFF on connection");
266 log_info(
LD_EDGE,
"Scaling down for XOFF count: %d %d %d",
291 "Got extra XOFF for bytes sent. Got %d, expected max %d",
301 log_info(
LD_EDGE,
"Got XOFF!");
308 STREAM_EVENT_XOFF_RECV,
335 "Got XON on invalid stream?");
342 "Got XON from wrong hop.");
348 "Got XON for non-congestion control circuit");
352 if (xon_cell_parse(&xon, msg->body, msg->length) < 0) {
354 "Received malformed XON cell.");
360 log_info(
LD_EDGE,
"Scaling down for XON count: %d %d %d",
382 limit = MIN(xoff_exit, xon_rate_bytes);
386 "Got extra XON for bytes sent. Got %d, expected max %d",
397 log_info(
LD_EDGE,
"Got XON: %d", xon->kbps_ewma);
401 uint64_t rate = ((uint64_t) xon_cell_get_kbps_ewma(xon) * 1000);
402 if (rate == 0 || INT32_MAX < rate) {
418 STREAM_EVENT_XON_RECV,
441 size_t total_buffered = connection_get_outbuf_len(
TO_CONN(stream));
442 uint32_t buffer_limit_xoff = 0;
445 log_err(
LD_BUG,
"Flow control called for non-congestion control circuit");
454 buffer_limit_xoff = xoff_exit;
457 if (total_buffered > buffer_limit_xoff) {
459 log_info(
LD_EDGE,
"Sending XOFF: %"TOR_PRIuSZ
" %d",
460 total_buffered, buffer_limit_xoff);
461 tor_trace(
TR_SUBSYS(cc), TR_EV(flow_decide_xoff_sending), stream);
463 cc_stats_flow_xoff_outbuf_ma =
464 stats_update_running_avg(cc_stats_flow_xoff_outbuf_ma,
529 size_t total_buffered = connection_get_outbuf_len(
TO_CONN(stream));
547 tor_trace(
TR_SUBSYS(cc), TR_EV(flow_decide_xon), stream, n_written);
560 log_debug(
LD_EDGE,
"Began edge buffering: %d %d %"TOR_PRIuSZ,
564 tor_trace(
TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_start),
582 if (stream->prev_drained_bytes == 0) {
596 log_debug(
LD_EDGE,
"Updating drain rate: %d %d %"TOR_PRIuSZ,
600 tor_trace(
TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_update),
616 log_info(
LD_EDGE,
"Sending rate-change XON: %d %d %"TOR_PRIuSZ,
620 tor_trace(
TR_SUBSYS(cc), TR_EV(flow_decide_xon_rate_change), stream);
622 cc_stats_flow_xon_outbuf_ma =
623 stats_update_running_avg(cc_stats_flow_xon_outbuf_ma,
628 }
else if (total_buffered == 0) {
629 log_info(
LD_EDGE,
"Sending XON: %d %d %"TOR_PRIuSZ,
633 tor_trace(
TR_SUBSYS(cc), TR_EV(flow_decide_xon_partial_drain), stream);
639 if (total_buffered == 0) {
649 "Queue empty for xon_rate_limit bytes: %d %d",
652 tor_trace(
TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_doubled), stream);
673 log_info(
LD_EDGE,
"Scaling down for flow control xmit bytes:: %d %d %d",
Fixed-size cell structure.
Header file for circuitlist.c.
uint64_t monotime_absolute_usec(void)
Header file for config.c.
bool edge_uses_cpath(const edge_connection_t *conn, const crypt_path_t *cpath)
Header file for conflux_util.c.
bool is_monotime_clock_reliable(void)
Public APIs for congestion control.
static uint64_t n_count_ewma(uint64_t curr, uint64_t prev, uint64_t N)
bool circuit_process_stream_xoff(edge_connection_t *conn, const crypt_path_t *layer_hint)
static uint32_t compute_drain_rate(const edge_connection_t *stream)
int flow_control_decide_xoff(edge_connection_t *stream)
void flow_control_note_sent_data(edge_connection_t *stream, size_t len)
static bool stream_drain_rate_changed(const edge_connection_t *stream)
bool conn_uses_flow_control(connection_t *conn)
void flow_control_new_consensus_params(const networkstatus_t *ns)
bool edge_uses_flow_control(const edge_connection_t *stream)
uint64_t cc_stats_flow_num_xoff_sent
bool circuit_process_stream_xon(edge_connection_t *conn, const crypt_path_t *layer_hint, const relay_msg_t *msg)
static void circuit_send_stream_xon(edge_connection_t *stream)
void flow_control_decide_xon(edge_connection_t *stream, size_t n_written)
static void circuit_send_stream_xoff(edge_connection_t *stream)
static uint32_t xoff_client
APIs for stream flow control on congestion controlled circuits.
Structure definitions for congestion control.
Header file for connection.c.
entry_connection_t * TO_ENTRY_CONN(connection_t *c)
edge_connection_t * TO_EDGE_CONN(connection_t *c)
Header file for connection_edge.c.
Base connection structure.
int control_event_stream_status(entry_connection_t *conn, stream_status_event_t tp, int reason_code)
Header file for control_events.c.
#define log_fn(severity, domain, args,...)
void connection_stop_reading(connection_t *conn)
void connection_start_reading(connection_t *conn)
Header file for mainloop.c.
int32_t networkstatus_get_param(const networkstatus_t *ns, const char *param_name, int32_t default_val, int32_t min_val, int32_t max_val)
Header file for networkstatus.c.
Master header file for Tor-specific functionality.
#define CELL_PAYLOAD_SIZE
#define RELAY_PAYLOAD_SIZE_MIN
#define RELAY_PAYLOAD_SIZE_MAX
int connection_edge_send_command(edge_connection_t *fromconn, uint8_t relay_command, const char *payload, size_t payload_len)
struct congestion_control_t * ccontrol
struct congestion_control_t * ccontrol
struct crypt_path_t * cpath_layer
uint64_t drain_start_usec
uint32_t total_bytes_xmit
struct circuit_t * on_circuit
uint32_t ewma_rate_last_sent
void token_bucket_rw_adjust(token_bucket_rw_t *bucket, uint32_t rate, uint32_t burst)