tor_proto/stream/flow_control.rs
1//! Code for implementing flow control (stream-level).
2
3use postage::watch;
4use tor_cell::relaycell::flow_ctrl::{FlowCtrlVersion, Xoff, Xon, XonKbpsEwma};
5use tor_cell::relaycell::msg::Sendme;
6use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
7
8use crate::congestion::sendme;
9use crate::util::notify::NotifySender;
10use crate::{Error, Result};
11
12/// The threshold number of incoming data bytes buffered on a stream at which we send an XOFF.
13// TODO(arti#534): We want to get the value from the consensus. The value in the consensus is the
14// number of relay cells, not number of bytes. But do we really want to use the number of relays
15// cells rather than bytes?
16#[cfg(feature = "flowctl-cc")]
17const CC_XOFF_CLIENT: usize = 250_000;
18
19/// Private internals of [`StreamFlowControl`].
20#[derive(Debug)]
21enum StreamFlowControlEnum {
22 /// "legacy" sendme-window-based flow control.
23 WindowBased(sendme::StreamSendWindow),
24 /// XON/XOFF flow control.
25 #[cfg(feature = "flowctl-cc")]
26 XonXoffBased(XonXoffControl),
27}
28
29/// Manages flow control for a stream.
30#[derive(Debug)]
31pub(crate) struct StreamFlowControl {
32 /// Private internal enum.
33 e: StreamFlowControlEnum,
34}
35
36impl StreamFlowControl {
37 /// Returns a new sendme-window-based [`StreamFlowControl`].
38 // TODO: Maybe take the raw u16 and create StreamSendWindow ourselves?
39 // Unclear whether we need or want to support creating this object from a
40 // preexisting StreamSendWindow.
41 pub(crate) fn new_window_based(window: sendme::StreamSendWindow) -> Self {
42 Self {
43 e: StreamFlowControlEnum::WindowBased(window),
44 }
45 }
46
47 /// Returns a new xon/xoff-based [`StreamFlowControl`].
48 #[cfg(feature = "flowctl-cc")]
49 pub(crate) fn new_xon_xoff_based(
50 rate_limit_updater: watch::Sender<StreamRateLimit>,
51 drain_rate_requester: NotifySender<DrainRateRequest>,
52 ) -> Self {
53 Self {
54 e: StreamFlowControlEnum::XonXoffBased(XonXoffControl {
55 rate_limit_updater,
56 drain_rate_requester,
57 last_sent_xon_xoff: None,
58 }),
59 }
60 }
61
62 /// Whether this stream is ready to send `msg`.
63 pub(crate) fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
64 match &self.e {
65 StreamFlowControlEnum::WindowBased(w) => {
66 !sendme::cmd_counts_towards_windows(msg.cmd()) || w.window() > 0
67 }
68 #[cfg(feature = "flowctl-cc")]
69 StreamFlowControlEnum::XonXoffBased(_) => {
70 // we perform rate-limiting in the `DataWriter`,
71 // so we send any messages that made it past the `DataWriter`
72 true
73 }
74 }
75 }
76
77 /// Take capacity to send `msg`. If there's insufficient capacity, returns
78 /// an error.
79 // TODO: Consider having this method wrap the message in a type that
80 // "proves" we've applied flow control. This would make it easier to apply
81 // flow control earlier, e.g. in `OpenStreamEntStream`, without introducing
82 // ambiguity in the sending function as to whether flow control has already
83 // been applied or not.
84 pub(crate) fn take_capacity_to_send<M: RelayMsg>(&mut self, msg: &M) -> Result<()> {
85 match &mut self.e {
86 StreamFlowControlEnum::WindowBased(w) => {
87 if sendme::cmd_counts_towards_windows(msg.cmd()) {
88 w.take().map(|_| ())
89 } else {
90 // TODO: Maybe make this an error?
91 // Ideally caller would have checked this already.
92 Ok(())
93 }
94 }
95 #[cfg(feature = "flowctl-cc")]
96 StreamFlowControlEnum::XonXoffBased(_) => {
97 // xon/xoff flow control doesn't have "capacity";
98 // the capacity is effectively controlled by the congestion control
99 Ok(())
100 }
101 }
102 }
103
104 /// Handle an incoming sendme.
105 ///
106 /// On success, return the number of cells left in the window.
107 ///
108 /// On failure, return an error: the caller should close the stream or
109 /// circuit with a protocol error.
110 ///
111 /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
112 /// correct type of flow control.
113 pub(crate) fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
114 match &mut self.e {
115 StreamFlowControlEnum::WindowBased(w) => {
116 let _sendme = msg
117 .decode::<Sendme>()
118 .map_err(|e| {
119 Error::from_bytes_err(e, "failed to decode stream sendme message")
120 })?
121 .into_msg();
122
123 w.put()
124 }
125 #[cfg(feature = "flowctl-cc")]
126 StreamFlowControlEnum::XonXoffBased(_) => Err(Error::CircProto(
127 "Stream level SENDME not allowed due to congestion control".into(),
128 )),
129 }
130 }
131
132 /// Handle an incoming XON message.
133 ///
134 /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
135 /// correct type of flow control.
136 pub(crate) fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
137 match &mut self.e {
138 StreamFlowControlEnum::WindowBased(_) => Err(Error::CircProto(
139 "XON messages not allowed with window flow control".into(),
140 )),
141 #[cfg(feature = "flowctl-cc")]
142 StreamFlowControlEnum::XonXoffBased(control) => {
143 let xon = msg
144 .decode::<Xon>()
145 .map_err(|e| Error::from_bytes_err(e, "failed to decode XON message"))?
146 .into_msg();
147
148 // > Parties SHOULD treat XON or XOFF cells with unrecognized versions as a protocol
149 // > violation.
150 if *xon.version() != 0 {
151 return Err(Error::CircProto("Unrecognized XON version".into()));
152 }
153
154 let rate = match xon.kbps_ewma() {
155 XonKbpsEwma::Limited(rate_kbps) => {
156 let rate_kbps = u64::from(rate_kbps.get());
157 // convert from kbps to bytes/s
158 StreamRateLimit::new_bytes_per_sec(rate_kbps * 1000 / 8)
159 }
160 XonKbpsEwma::Unlimited => StreamRateLimit::MAX,
161 };
162
163 *control.rate_limit_updater.borrow_mut() = rate;
164 Ok(())
165 }
166 }
167 }
168
169 /// Handle an incoming XOFF message.
170 ///
171 /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
172 /// correct type of flow control.
173 pub(crate) fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
174 match &mut self.e {
175 StreamFlowControlEnum::WindowBased(_) => Err(Error::CircProto(
176 "XOFF messages not allowed with window flow control".into(),
177 )),
178 #[cfg(feature = "flowctl-cc")]
179 StreamFlowControlEnum::XonXoffBased(control) => {
180 let xoff = msg
181 .decode::<Xoff>()
182 .map_err(|e| Error::from_bytes_err(e, "failed to decode XOFF message"))?
183 .into_msg();
184
185 // > Parties SHOULD treat XON or XOFF cells with unrecognized versions as a protocol
186 // > violation.
187 if *xoff.version() != 0 {
188 return Err(Error::CircProto("Unrecognized XOFF version".into()));
189 }
190
191 *control.rate_limit_updater.borrow_mut() = StreamRateLimit::ZERO;
192 Ok(())
193 }
194 }
195 }
196
197 /// Check if we should send an XON message.
198 ///
199 /// If we should, then returns the XON message that should be sent.
200 /// Returns an error if XON/XOFF messages aren't supported for this type of flow control.
201 pub(crate) fn maybe_send_xon(
202 &mut self,
203 rate: XonKbpsEwma,
204 buffer_len: usize,
205 ) -> Result<Option<Xon>> {
206 match &mut self.e {
207 StreamFlowControlEnum::WindowBased(_) => Err(Error::CircProto(
208 "XON messages cannot be sent with window flow control".into(),
209 )),
210 #[cfg(feature = "flowctl-cc")]
211 StreamFlowControlEnum::XonXoffBased(control) => {
212 if buffer_len > CC_XOFF_CLIENT {
213 // we can't send an XON, and we should have already sent an XOFF when the queue first
214 // exceeded the limit (see `maybe_send_xoff()`)
215 debug_assert!(matches!(
216 control.last_sent_xon_xoff,
217 Some(LastSentXonXoff::Xoff),
218 ));
219
220 // inform the stream reader that we need a new drain rate
221 control.drain_rate_requester.notify();
222 return Ok(None);
223 }
224
225 control.last_sent_xon_xoff = Some(LastSentXonXoff::Xon(rate));
226
227 Ok(Some(Xon::new(FlowCtrlVersion::V0, rate)))
228 }
229 }
230 }
231
232 /// Check if we should send an XOFF message.
233 ///
234 /// If we should, then returns the XOFF message that should be sent.
235 /// Returns an error if XON/XOFF messages aren't supported for this type of flow control.
236 pub(crate) fn maybe_send_xoff(&mut self, buffer_len: usize) -> Result<Option<Xoff>> {
237 match &mut self.e {
238 StreamFlowControlEnum::WindowBased(_) => Err(Error::CircProto(
239 "XOFF messages cannot be sent with window flow control".into(),
240 )),
241 #[cfg(feature = "flowctl-cc")]
242 StreamFlowControlEnum::XonXoffBased(control) => {
243 // if the last XON/XOFF we sent was an XOFF, no need to send another
244 if matches!(control.last_sent_xon_xoff, Some(LastSentXonXoff::Xoff)) {
245 return Ok(None);
246 }
247
248 if buffer_len <= CC_XOFF_CLIENT {
249 return Ok(None);
250 }
251
252 // either we have never sent an XOFF or XON, or we last sent an XON
253
254 // remember that we last sent an XOFF
255 control.last_sent_xon_xoff = Some(LastSentXonXoff::Xoff);
256
257 // inform the stream reader that we need a new drain rate
258 control.drain_rate_requester.notify();
259
260 Ok(Some(Xoff::new(FlowCtrlVersion::V0)))
261 }
262 }
263 }
264}
265
266/// Control state for XON/XOFF flow control.
267#[derive(Debug)]
268struct XonXoffControl {
269 /// How we communicate rate limit updates to the
270 /// [`DataWriter`](crate::stream::data::DataWriter).
271 rate_limit_updater: watch::Sender<StreamRateLimit>,
272 /// How we communicate requests for new drain rate updates to the
273 /// [`XonXoffReader`](crate::stream::xon_xoff::XonXoffReader).
274 drain_rate_requester: NotifySender<DrainRateRequest>,
275 /// The last rate limit we sent.
276 last_sent_xon_xoff: Option<LastSentXonXoff>,
277}
278
279/// The last XON/XOFF message that we sent.
280#[derive(Debug)]
281enum LastSentXonXoff {
282 /// XON message with a rate.
283 // TODO: I'm expecting that we'll want the `XonKbpsEwma` in the future.
284 // If that doesn't end up being the case, then we should remove it.
285 #[expect(dead_code)]
286 Xon(XonKbpsEwma),
287 /// XOFF message.
288 Xoff,
289}
290
291/// A newtype wrapper for a tor stream rate limit that makes the units explicit.
292#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
293pub(crate) struct StreamRateLimit {
294 /// The rate in bytes/s.
295 rate: u64,
296}
297
298impl StreamRateLimit {
299 /// A maximum rate limit.
300 pub(crate) const MAX: Self = Self::new_bytes_per_sec(u64::MAX);
301
302 /// A rate limit of 0.
303 pub(crate) const ZERO: Self = Self::new_bytes_per_sec(0);
304
305 /// A new [`StreamRateLimit`] with `rate` bytes/s.
306 pub(crate) const fn new_bytes_per_sec(rate: u64) -> Self {
307 Self { rate }
308 }
309
310 /// The rate in bytes/s.
311 pub(crate) const fn bytes_per_sec(&self) -> u64 {
312 self.rate
313 }
314}
315
316impl std::fmt::Display for StreamRateLimit {
317 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
318 write!(f, "{} bytes/s", self.rate)
319 }
320}
321
322/// A marker type for a [`NotifySender`] indicating that notifications are for new drain rate
323/// requests.
324#[derive(Debug)]
325pub(crate) struct DrainRateRequest;