tor_proto/stream/
flow_control.rs

1//! Code for implementing flow control (stream-level).
2
3use postage::watch;
4use tor_cell::relaycell::flow_ctrl::{FlowCtrlVersion, Xoff, Xon, XonKbpsEwma};
5use tor_cell::relaycell::msg::Sendme;
6use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
7
8use crate::congestion::sendme;
9use crate::util::notify::NotifySender;
10use crate::{Error, Result};
11
12/// The threshold number of incoming data bytes buffered on a stream at which we send an XOFF.
13// TODO(arti#534): We want to get the value from the consensus. The value in the consensus is the
14// number of relay cells, not number of bytes. But do we really want to use the number of relays
15// cells rather than bytes?
16#[cfg(feature = "flowctl-cc")]
17const CC_XOFF_CLIENT: usize = 250_000;
18
19/// Private internals of [`StreamFlowControl`].
20#[derive(Debug)]
21enum StreamFlowControlEnum {
22    /// "legacy" sendme-window-based flow control.
23    WindowBased(sendme::StreamSendWindow),
24    /// XON/XOFF flow control.
25    #[cfg(feature = "flowctl-cc")]
26    XonXoffBased(XonXoffControl),
27}
28
29/// Manages flow control for a stream.
30#[derive(Debug)]
31pub(crate) struct StreamFlowControl {
32    /// Private internal enum.
33    e: StreamFlowControlEnum,
34}
35
36impl StreamFlowControl {
37    /// Returns a new sendme-window-based [`StreamFlowControl`].
38    // TODO: Maybe take the raw u16 and create StreamSendWindow ourselves?
39    // Unclear whether we need or want to support creating this object from a
40    // preexisting StreamSendWindow.
41    pub(crate) fn new_window_based(window: sendme::StreamSendWindow) -> Self {
42        Self {
43            e: StreamFlowControlEnum::WindowBased(window),
44        }
45    }
46
47    /// Returns a new xon/xoff-based [`StreamFlowControl`].
48    #[cfg(feature = "flowctl-cc")]
49    pub(crate) fn new_xon_xoff_based(
50        rate_limit_updater: watch::Sender<StreamRateLimit>,
51        drain_rate_requester: NotifySender<DrainRateRequest>,
52    ) -> Self {
53        Self {
54            e: StreamFlowControlEnum::XonXoffBased(XonXoffControl {
55                rate_limit_updater,
56                drain_rate_requester,
57                last_sent_xon_xoff: None,
58            }),
59        }
60    }
61
62    /// Whether this stream is ready to send `msg`.
63    pub(crate) fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
64        match &self.e {
65            StreamFlowControlEnum::WindowBased(w) => {
66                !sendme::cmd_counts_towards_windows(msg.cmd()) || w.window() > 0
67            }
68            #[cfg(feature = "flowctl-cc")]
69            StreamFlowControlEnum::XonXoffBased(_) => {
70                // we perform rate-limiting in the `DataWriter`,
71                // so we send any messages that made it past the `DataWriter`
72                true
73            }
74        }
75    }
76
77    /// Take capacity to send `msg`. If there's insufficient capacity, returns
78    /// an error.
79    // TODO: Consider having this method wrap the message in a type that
80    // "proves" we've applied flow control. This would make it easier to apply
81    // flow control earlier, e.g. in `OpenStreamEntStream`, without introducing
82    // ambiguity in the sending function as to whether flow control has already
83    // been applied or not.
84    pub(crate) fn take_capacity_to_send<M: RelayMsg>(&mut self, msg: &M) -> Result<()> {
85        match &mut self.e {
86            StreamFlowControlEnum::WindowBased(w) => {
87                if sendme::cmd_counts_towards_windows(msg.cmd()) {
88                    w.take().map(|_| ())
89                } else {
90                    // TODO: Maybe make this an error?
91                    // Ideally caller would have checked this already.
92                    Ok(())
93                }
94            }
95            #[cfg(feature = "flowctl-cc")]
96            StreamFlowControlEnum::XonXoffBased(_) => {
97                // xon/xoff flow control doesn't have "capacity";
98                // the capacity is effectively controlled by the congestion control
99                Ok(())
100            }
101        }
102    }
103
104    /// Handle an incoming sendme.
105    ///
106    /// On success, return the number of cells left in the window.
107    ///
108    /// On failure, return an error: the caller should close the stream or
109    /// circuit with a protocol error.
110    ///
111    /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
112    /// correct type of flow control.
113    pub(crate) fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
114        match &mut self.e {
115            StreamFlowControlEnum::WindowBased(w) => {
116                let _sendme = msg
117                    .decode::<Sendme>()
118                    .map_err(|e| {
119                        Error::from_bytes_err(e, "failed to decode stream sendme message")
120                    })?
121                    .into_msg();
122
123                w.put()
124            }
125            #[cfg(feature = "flowctl-cc")]
126            StreamFlowControlEnum::XonXoffBased(_) => Err(Error::CircProto(
127                "Stream level SENDME not allowed due to congestion control".into(),
128            )),
129        }
130    }
131
132    /// Handle an incoming XON message.
133    ///
134    /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
135    /// correct type of flow control.
136    pub(crate) fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
137        match &mut self.e {
138            StreamFlowControlEnum::WindowBased(_) => Err(Error::CircProto(
139                "XON messages not allowed with window flow control".into(),
140            )),
141            #[cfg(feature = "flowctl-cc")]
142            StreamFlowControlEnum::XonXoffBased(control) => {
143                let xon = msg
144                    .decode::<Xon>()
145                    .map_err(|e| Error::from_bytes_err(e, "failed to decode XON message"))?
146                    .into_msg();
147
148                // > Parties SHOULD treat XON or XOFF cells with unrecognized versions as a protocol
149                // > violation.
150                if *xon.version() != 0 {
151                    return Err(Error::CircProto("Unrecognized XON version".into()));
152                }
153
154                let rate = match xon.kbps_ewma() {
155                    XonKbpsEwma::Limited(rate_kbps) => {
156                        let rate_kbps = u64::from(rate_kbps.get());
157                        // convert from kbps to bytes/s
158                        StreamRateLimit::new_bytes_per_sec(rate_kbps * 1000 / 8)
159                    }
160                    XonKbpsEwma::Unlimited => StreamRateLimit::MAX,
161                };
162
163                *control.rate_limit_updater.borrow_mut() = rate;
164                Ok(())
165            }
166        }
167    }
168
169    /// Handle an incoming XOFF message.
170    ///
171    /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
172    /// correct type of flow control.
173    pub(crate) fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
174        match &mut self.e {
175            StreamFlowControlEnum::WindowBased(_) => Err(Error::CircProto(
176                "XOFF messages not allowed with window flow control".into(),
177            )),
178            #[cfg(feature = "flowctl-cc")]
179            StreamFlowControlEnum::XonXoffBased(control) => {
180                let xoff = msg
181                    .decode::<Xoff>()
182                    .map_err(|e| Error::from_bytes_err(e, "failed to decode XOFF message"))?
183                    .into_msg();
184
185                // > Parties SHOULD treat XON or XOFF cells with unrecognized versions as a protocol
186                // > violation.
187                if *xoff.version() != 0 {
188                    return Err(Error::CircProto("Unrecognized XOFF version".into()));
189                }
190
191                *control.rate_limit_updater.borrow_mut() = StreamRateLimit::ZERO;
192                Ok(())
193            }
194        }
195    }
196
197    /// Check if we should send an XON message.
198    ///
199    /// If we should, then returns the XON message that should be sent.
200    /// Returns an error if XON/XOFF messages aren't supported for this type of flow control.
201    pub(crate) fn maybe_send_xon(
202        &mut self,
203        rate: XonKbpsEwma,
204        buffer_len: usize,
205    ) -> Result<Option<Xon>> {
206        match &mut self.e {
207            StreamFlowControlEnum::WindowBased(_) => Err(Error::CircProto(
208                "XON messages cannot be sent with window flow control".into(),
209            )),
210            #[cfg(feature = "flowctl-cc")]
211            StreamFlowControlEnum::XonXoffBased(control) => {
212                if buffer_len > CC_XOFF_CLIENT {
213                    // we can't send an XON, and we should have already sent an XOFF when the queue first
214                    // exceeded the limit (see `maybe_send_xoff()`)
215                    debug_assert!(matches!(
216                        control.last_sent_xon_xoff,
217                        Some(LastSentXonXoff::Xoff),
218                    ));
219
220                    // inform the stream reader that we need a new drain rate
221                    control.drain_rate_requester.notify();
222                    return Ok(None);
223                }
224
225                control.last_sent_xon_xoff = Some(LastSentXonXoff::Xon(rate));
226
227                Ok(Some(Xon::new(FlowCtrlVersion::V0, rate)))
228            }
229        }
230    }
231
232    /// Check if we should send an XOFF message.
233    ///
234    /// If we should, then returns the XOFF message that should be sent.
235    /// Returns an error if XON/XOFF messages aren't supported for this type of flow control.
236    pub(crate) fn maybe_send_xoff(&mut self, buffer_len: usize) -> Result<Option<Xoff>> {
237        match &mut self.e {
238            StreamFlowControlEnum::WindowBased(_) => Err(Error::CircProto(
239                "XOFF messages cannot be sent with window flow control".into(),
240            )),
241            #[cfg(feature = "flowctl-cc")]
242            StreamFlowControlEnum::XonXoffBased(control) => {
243                // if the last XON/XOFF we sent was an XOFF, no need to send another
244                if matches!(control.last_sent_xon_xoff, Some(LastSentXonXoff::Xoff)) {
245                    return Ok(None);
246                }
247
248                if buffer_len <= CC_XOFF_CLIENT {
249                    return Ok(None);
250                }
251
252                // either we have never sent an XOFF or XON, or we last sent an XON
253
254                // remember that we last sent an XOFF
255                control.last_sent_xon_xoff = Some(LastSentXonXoff::Xoff);
256
257                // inform the stream reader that we need a new drain rate
258                control.drain_rate_requester.notify();
259
260                Ok(Some(Xoff::new(FlowCtrlVersion::V0)))
261            }
262        }
263    }
264}
265
266/// Control state for XON/XOFF flow control.
267#[derive(Debug)]
268struct XonXoffControl {
269    /// How we communicate rate limit updates to the
270    /// [`DataWriter`](crate::stream::data::DataWriter).
271    rate_limit_updater: watch::Sender<StreamRateLimit>,
272    /// How we communicate requests for new drain rate updates to the
273    /// [`XonXoffReader`](crate::stream::xon_xoff::XonXoffReader).
274    drain_rate_requester: NotifySender<DrainRateRequest>,
275    /// The last rate limit we sent.
276    last_sent_xon_xoff: Option<LastSentXonXoff>,
277}
278
279/// The last XON/XOFF message that we sent.
280#[derive(Debug)]
281enum LastSentXonXoff {
282    /// XON message with a rate.
283    // TODO: I'm expecting that we'll want the `XonKbpsEwma` in the future.
284    // If that doesn't end up being the case, then we should remove it.
285    #[expect(dead_code)]
286    Xon(XonKbpsEwma),
287    /// XOFF message.
288    Xoff,
289}
290
291/// A newtype wrapper for a tor stream rate limit that makes the units explicit.
292#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
293pub(crate) struct StreamRateLimit {
294    /// The rate in bytes/s.
295    rate: u64,
296}
297
298impl StreamRateLimit {
299    /// A maximum rate limit.
300    pub(crate) const MAX: Self = Self::new_bytes_per_sec(u64::MAX);
301
302    /// A rate limit of 0.
303    pub(crate) const ZERO: Self = Self::new_bytes_per_sec(0);
304
305    /// A new [`StreamRateLimit`] with `rate` bytes/s.
306    pub(crate) const fn new_bytes_per_sec(rate: u64) -> Self {
307        Self { rate }
308    }
309
310    /// The rate in bytes/s.
311    pub(crate) const fn bytes_per_sec(&self) -> u64 {
312        self.rate
313    }
314}
315
316impl std::fmt::Display for StreamRateLimit {
317    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
318        write!(f, "{} bytes/s", self.rate)
319    }
320}
321
322/// A marker type for a [`NotifySender`] indicating that notifications are for new drain rate
323/// requests.
324#[derive(Debug)]
325pub(crate) struct DrainRateRequest;