tor_proto/stream/
flow_control.rs

1//! Code for implementing flow control (stream-level).
2
3use postage::watch;
4#[cfg(feature = "flowctl-cc")]
5use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
6use tor_cell::relaycell::msg::Sendme;
7use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
8
9use crate::congestion::sendme;
10use crate::{Error, Result};
11
12/// Private internals of [`StreamSendFlowControl`].
13#[derive(Debug)]
14enum StreamSendFlowControlEnum {
15    /// "legacy" sendme-window-based flow control.
16    WindowBased(sendme::StreamSendWindow),
17    /// XON/XOFF flow control.
18    #[cfg(feature = "flowctl-cc")]
19    XonXoffBased(XonXoffControl),
20}
21
22/// Manages outgoing flow control for a stream.
23#[derive(Debug)]
24pub(crate) struct StreamSendFlowControl {
25    /// Private internal enum.
26    e: StreamSendFlowControlEnum,
27}
28
29impl StreamSendFlowControl {
30    /// Returns a new sendme-window-based [`StreamSendFlowControl`].
31    // TODO: Maybe take the raw u16 and create StreamSendWindow ourselves?
32    // Unclear whether we need or want to support creating this object from a
33    // preexisting StreamSendWindow.
34    pub(crate) fn new_window_based(window: sendme::StreamSendWindow) -> Self {
35        Self {
36            e: StreamSendFlowControlEnum::WindowBased(window),
37        }
38    }
39
40    /// Returns a new xon/xoff-based [`StreamSendFlowControl`].
41    #[cfg(feature = "flowctl-cc")]
42    pub(crate) fn new_xon_xoff_based(rate_limit_updater: watch::Sender<StreamRateLimit>) -> Self {
43        Self {
44            e: StreamSendFlowControlEnum::XonXoffBased(XonXoffControl { rate_limit_updater }),
45        }
46    }
47
48    /// Whether this stream is ready to send `msg`.
49    pub(crate) fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
50        match &self.e {
51            StreamSendFlowControlEnum::WindowBased(w) => {
52                !sendme::cmd_counts_towards_windows(msg.cmd()) || w.window() > 0
53            }
54            #[cfg(feature = "flowctl-cc")]
55            StreamSendFlowControlEnum::XonXoffBased(_) => {
56                // we perform rate-limiting in the `DataWriter`,
57                // so we send any messages that made it past the `DataWriter`
58                true
59            }
60        }
61    }
62
63    /// Take capacity to send `msg`. If there's insufficient capacity, returns
64    /// an error.
65    // TODO: Consider having this method wrap the message in a type that
66    // "proves" we've applied flow control. This would make it easier to apply
67    // flow control earlier, e.g. in `OpenStreamEntStream`, without introducing
68    // ambiguity in the sending function as to whether flow control has already
69    // been applied or not.
70    pub(crate) fn take_capacity_to_send<M: RelayMsg>(&mut self, msg: &M) -> Result<()> {
71        match &mut self.e {
72            StreamSendFlowControlEnum::WindowBased(w) => {
73                if sendme::cmd_counts_towards_windows(msg.cmd()) {
74                    w.take().map(|_| ())
75                } else {
76                    // TODO: Maybe make this an error?
77                    // Ideally caller would have checked this already.
78                    Ok(())
79                }
80            }
81            #[cfg(feature = "flowctl-cc")]
82            StreamSendFlowControlEnum::XonXoffBased(_) => {
83                // xon/xoff flow control doesn't have "capacity";
84                // the capacity is effectively controlled by the congestion control
85                Ok(())
86            }
87        }
88    }
89
90    /// Handle an incoming sendme.
91    ///
92    /// On success, return the number of cells left in the window.
93    ///
94    /// On failure, return an error: the caller should close the stream or
95    /// circuit with a protocol error.
96    ///
97    /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
98    /// correct type of flow control.
99    pub(crate) fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
100        match &mut self.e {
101            StreamSendFlowControlEnum::WindowBased(w) => {
102                let _sendme = msg
103                    .decode::<Sendme>()
104                    .map_err(|e| {
105                        Error::from_bytes_err(e, "failed to decode stream sendme message")
106                    })?
107                    .into_msg();
108
109                w.put()
110            }
111            #[cfg(feature = "flowctl-cc")]
112            StreamSendFlowControlEnum::XonXoffBased(_) => Err(Error::CircProto(
113                "Stream level SENDME not allowed due to congestion control".into(),
114            )),
115        }
116    }
117
118    /// Handle an incoming XON message.
119    ///
120    /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
121    /// correct type of flow control.
122    pub(crate) fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
123        match &mut self.e {
124            StreamSendFlowControlEnum::WindowBased(_) => Err(Error::CircProto(
125                "XON messages not allowed with window flow control".into(),
126            )),
127            #[cfg(feature = "flowctl-cc")]
128            StreamSendFlowControlEnum::XonXoffBased(control) => {
129                let xon = msg
130                    .decode::<Xon>()
131                    .map_err(|e| Error::from_bytes_err(e, "failed to decode XON message"))?
132                    .into_msg();
133
134                // > Parties SHOULD treat XON or XOFF cells with unrecognized versions as a protocol
135                // > violation.
136                if *xon.version() != 0 {
137                    return Err(Error::CircProto("Unrecognized XON version".into()));
138                }
139
140                let rate = match xon.kbps_ewma() {
141                    XonKbpsEwma::Limited(rate_kbps) => {
142                        let rate_kbps = u64::from(rate_kbps.get());
143                        // convert from kbps to bytes/s
144                        StreamRateLimit::new_bytes_per_sec(rate_kbps * 1000 / 8)
145                    }
146                    XonKbpsEwma::Unlimited => StreamRateLimit::MAX,
147                };
148
149                *control.rate_limit_updater.borrow_mut() = rate;
150                Ok(())
151            }
152        }
153    }
154
155    /// Handle an incoming XOFF message.
156    ///
157    /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
158    /// correct type of flow control.
159    pub(crate) fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
160        match &mut self.e {
161            StreamSendFlowControlEnum::WindowBased(_) => Err(Error::CircProto(
162                "XOFF messages not allowed with window flow control".into(),
163            )),
164            #[cfg(feature = "flowctl-cc")]
165            StreamSendFlowControlEnum::XonXoffBased(control) => {
166                let xoff = msg
167                    .decode::<Xoff>()
168                    .map_err(|e| Error::from_bytes_err(e, "failed to decode XOFF message"))?
169                    .into_msg();
170
171                // > Parties SHOULD treat XON or XOFF cells with unrecognized versions as a protocol
172                // > violation.
173                if *xoff.version() != 0 {
174                    return Err(Error::CircProto("Unrecognized XOFF version".into()));
175                }
176
177                *control.rate_limit_updater.borrow_mut() = StreamRateLimit::ZERO;
178                Ok(())
179            }
180        }
181    }
182}
183
184/// Control state for XON/XOFF flow control.
185#[derive(Debug)]
186struct XonXoffControl {
187    /// How we communicate rate limit updates to the
188    /// [`DataWriter`](crate::stream::data::DataWriter).
189    rate_limit_updater: watch::Sender<StreamRateLimit>,
190}
191
192/// A newtype wrapper for a tor stream rate limit that makes the units explicit.
193#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
194pub(crate) struct StreamRateLimit {
195    /// The rate in bytes/s.
196    rate: u64,
197}
198
199impl StreamRateLimit {
200    /// A maximum rate limit.
201    pub(crate) const MAX: Self = Self::new_bytes_per_sec(u64::MAX);
202
203    /// A rate limit of 0.
204    pub(crate) const ZERO: Self = Self::new_bytes_per_sec(0);
205
206    /// A new [`StreamRateLimit`] with `rate` bytes/s.
207    pub(crate) const fn new_bytes_per_sec(rate: u64) -> Self {
208        Self { rate }
209    }
210
211    /// The rate in bytes/s.
212    pub(crate) const fn bytes_per_sec(&self) -> u64 {
213        self.rate
214    }
215}
216
217impl std::fmt::Display for StreamRateLimit {
218    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219        write!(f, "{} bytes/s", self.rate)
220    }
221}