1
//! Code for implementing flow control (stream-level).
2

            
3
use postage::watch;
4
use tor_cell::relaycell::flow_ctrl::{FlowCtrlVersion, Xoff, Xon, XonKbpsEwma};
5
use tor_cell::relaycell::msg::Sendme;
6
use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
7

            
8
use crate::congestion::sendme;
9
use crate::util::notify::NotifySender;
10
use crate::{Error, Result};
11

            
12
/// The threshold number of incoming data bytes buffered on a stream at which we send an XOFF.
13
// TODO(arti#534): We want to get the value from the consensus. The value in the consensus is the
14
// number of relay cells, not number of bytes. But do we really want to use the number of relays
15
// cells rather than bytes?
16
#[cfg(feature = "flowctl-cc")]
17
const CC_XOFF_CLIENT: usize = 250_000;
18

            
19
/// Private internals of [`StreamFlowControl`].
20
#[derive(Debug)]
21
enum StreamFlowControlEnum {
22
    /// "legacy" sendme-window-based flow control.
23
    WindowBased(sendme::StreamSendWindow),
24
    /// XON/XOFF flow control.
25
    #[cfg(feature = "flowctl-cc")]
26
    XonXoffBased(XonXoffControl),
27
}
28

            
29
/// Manages flow control for a stream.
30
#[derive(Debug)]
31
pub(crate) struct StreamFlowControl {
32
    /// Private internal enum.
33
    e: StreamFlowControlEnum,
34
}
35

            
36
impl StreamFlowControl {
37
    /// Returns a new sendme-window-based [`StreamFlowControl`].
38
    // TODO: Maybe take the raw u16 and create StreamSendWindow ourselves?
39
    // Unclear whether we need or want to support creating this object from a
40
    // preexisting StreamSendWindow.
41
346
    pub(crate) fn new_window_based(window: sendme::StreamSendWindow) -> Self {
42
346
        Self {
43
346
            e: StreamFlowControlEnum::WindowBased(window),
44
346
        }
45
346
    }
46

            
47
    /// Returns a new xon/xoff-based [`StreamFlowControl`].
48
    #[cfg(feature = "flowctl-cc")]
49
12
    pub(crate) fn new_xon_xoff_based(
50
12
        rate_limit_updater: watch::Sender<StreamRateLimit>,
51
12
        drain_rate_requester: NotifySender<DrainRateRequest>,
52
12
    ) -> Self {
53
12
        Self {
54
12
            e: StreamFlowControlEnum::XonXoffBased(XonXoffControl {
55
12
                rate_limit_updater,
56
12
                drain_rate_requester,
57
12
                last_sent_xon_xoff: None,
58
12
            }),
59
12
        }
60
12
    }
61

            
62
    /// Whether this stream is ready to send `msg`.
63
20168
    pub(crate) fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
64
20168
        match &self.e {
65
14168
            StreamFlowControlEnum::WindowBased(w) => {
66
14168
                !sendme::cmd_counts_towards_windows(msg.cmd()) || w.window() > 0
67
            }
68
            #[cfg(feature = "flowctl-cc")]
69
            StreamFlowControlEnum::XonXoffBased(_) => {
70
                // we perform rate-limiting in the `DataWriter`,
71
                // so we send any messages that made it past the `DataWriter`
72
6000
                true
73
            }
74
        }
75
20168
    }
76

            
77
    /// Take capacity to send `msg`. If there's insufficient capacity, returns
78
    /// an error.
79
    // TODO: Consider having this method wrap the message in a type that
80
    // "proves" we've applied flow control. This would make it easier to apply
81
    // flow control earlier, e.g. in `OpenStreamEntStream`, without introducing
82
    // ambiguity in the sending function as to whether flow control has already
83
    // been applied or not.
84
4016
    pub(crate) fn take_capacity_to_send<M: RelayMsg>(&mut self, msg: &M) -> Result<()> {
85
4016
        match &mut self.e {
86
2816
            StreamFlowControlEnum::WindowBased(w) => {
87
2816
                if sendme::cmd_counts_towards_windows(msg.cmd()) {
88
2816
                    w.take().map(|_| ())
89
                } else {
90
                    // TODO: Maybe make this an error?
91
                    // Ideally caller would have checked this already.
92
                    Ok(())
93
                }
94
            }
95
            #[cfg(feature = "flowctl-cc")]
96
            StreamFlowControlEnum::XonXoffBased(_) => {
97
                // xon/xoff flow control doesn't have "capacity";
98
                // the capacity is effectively controlled by the congestion control
99
1200
                Ok(())
100
            }
101
        }
102
4016
    }
103

            
104
    /// Handle an incoming sendme.
105
    ///
106
    /// On success, return the number of cells left in the window.
107
    ///
108
    /// On failure, return an error: the caller should close the stream or
109
    /// circuit with a protocol error.
110
    ///
111
    /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
112
    /// correct type of flow control.
113
8
    pub(crate) fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
114
8
        match &mut self.e {
115
8
            StreamFlowControlEnum::WindowBased(w) => {
116
8
                let _sendme = msg
117
8
                    .decode::<Sendme>()
118
8
                    .map_err(|e| {
119
                        Error::from_bytes_err(e, "failed to decode stream sendme message")
120
8
                    })?
121
8
                    .into_msg();
122
8

            
123
8
                w.put()
124
            }
125
            #[cfg(feature = "flowctl-cc")]
126
            StreamFlowControlEnum::XonXoffBased(_) => Err(Error::CircProto(
127
                "Stream level SENDME not allowed due to congestion control".into(),
128
            )),
129
        }
130
8
    }
131

            
132
    /// Handle an incoming XON message.
133
    ///
134
    /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
135
    /// correct type of flow control.
136
    pub(crate) fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
137
        match &mut self.e {
138
            StreamFlowControlEnum::WindowBased(_) => Err(Error::CircProto(
139
                "XON messages not allowed with window flow control".into(),
140
            )),
141
            #[cfg(feature = "flowctl-cc")]
142
            StreamFlowControlEnum::XonXoffBased(control) => {
143
                let xon = msg
144
                    .decode::<Xon>()
145
                    .map_err(|e| Error::from_bytes_err(e, "failed to decode XON message"))?
146
                    .into_msg();
147

            
148
                // > Parties SHOULD treat XON or XOFF cells with unrecognized versions as a protocol
149
                // > violation.
150
                if *xon.version() != 0 {
151
                    return Err(Error::CircProto("Unrecognized XON version".into()));
152
                }
153

            
154
                let rate = match xon.kbps_ewma() {
155
                    XonKbpsEwma::Limited(rate_kbps) => {
156
                        let rate_kbps = u64::from(rate_kbps.get());
157
                        // convert from kbps to bytes/s
158
                        StreamRateLimit::new_bytes_per_sec(rate_kbps * 1000 / 8)
159
                    }
160
                    XonKbpsEwma::Unlimited => StreamRateLimit::MAX,
161
                };
162

            
163
                *control.rate_limit_updater.borrow_mut() = rate;
164
                Ok(())
165
            }
166
        }
167
    }
168

            
169
    /// Handle an incoming XOFF message.
170
    ///
171
    /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
172
    /// correct type of flow control.
173
    pub(crate) fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
174
        match &mut self.e {
175
            StreamFlowControlEnum::WindowBased(_) => Err(Error::CircProto(
176
                "XOFF messages not allowed with window flow control".into(),
177
            )),
178
            #[cfg(feature = "flowctl-cc")]
179
            StreamFlowControlEnum::XonXoffBased(control) => {
180
                let xoff = msg
181
                    .decode::<Xoff>()
182
                    .map_err(|e| Error::from_bytes_err(e, "failed to decode XOFF message"))?
183
                    .into_msg();
184

            
185
                // > Parties SHOULD treat XON or XOFF cells with unrecognized versions as a protocol
186
                // > violation.
187
                if *xoff.version() != 0 {
188
                    return Err(Error::CircProto("Unrecognized XOFF version".into()));
189
                }
190

            
191
                *control.rate_limit_updater.borrow_mut() = StreamRateLimit::ZERO;
192
                Ok(())
193
            }
194
        }
195
    }
196

            
197
    /// Check if we should send an XON message.
198
    ///
199
    /// If we should, then returns the XON message that should be sent.
200
    /// Returns an error if XON/XOFF messages aren't supported for this type of flow control.
201
    pub(crate) fn maybe_send_xon(
202
        &mut self,
203
        rate: XonKbpsEwma,
204
        buffer_len: usize,
205
    ) -> Result<Option<Xon>> {
206
        match &mut self.e {
207
            StreamFlowControlEnum::WindowBased(_) => Err(Error::CircProto(
208
                "XON messages cannot be sent with window flow control".into(),
209
            )),
210
            #[cfg(feature = "flowctl-cc")]
211
            StreamFlowControlEnum::XonXoffBased(control) => {
212
                if buffer_len > CC_XOFF_CLIENT {
213
                    // we can't send an XON, and we should have already sent an XOFF when the queue first
214
                    // exceeded the limit (see `maybe_send_xoff()`)
215
                    debug_assert!(matches!(
216
                        control.last_sent_xon_xoff,
217
                        Some(LastSentXonXoff::Xoff),
218
                    ));
219

            
220
                    // inform the stream reader that we need a new drain rate
221
                    control.drain_rate_requester.notify();
222
                    return Ok(None);
223
                }
224

            
225
                control.last_sent_xon_xoff = Some(LastSentXonXoff::Xon(rate));
226

            
227
                Ok(Some(Xon::new(FlowCtrlVersion::V0, rate)))
228
            }
229
        }
230
    }
231

            
232
    /// Check if we should send an XOFF message.
233
    ///
234
    /// If we should, then returns the XOFF message that should be sent.
235
    /// Returns an error if XON/XOFF messages aren't supported for this type of flow control.
236
64
    pub(crate) fn maybe_send_xoff(&mut self, buffer_len: usize) -> Result<Option<Xoff>> {
237
64
        match &mut self.e {
238
            StreamFlowControlEnum::WindowBased(_) => Err(Error::CircProto(
239
                "XOFF messages cannot be sent with window flow control".into(),
240
            )),
241
            #[cfg(feature = "flowctl-cc")]
242
64
            StreamFlowControlEnum::XonXoffBased(control) => {
243
                // if the last XON/XOFF we sent was an XOFF, no need to send another
244
64
                if matches!(control.last_sent_xon_xoff, Some(LastSentXonXoff::Xoff)) {
245
                    return Ok(None);
246
64
                }
247
64

            
248
64
                if buffer_len <= CC_XOFF_CLIENT {
249
64
                    return Ok(None);
250
                }
251

            
252
                // either we have never sent an XOFF or XON, or we last sent an XON
253

            
254
                // remember that we last sent an XOFF
255
                control.last_sent_xon_xoff = Some(LastSentXonXoff::Xoff);
256

            
257
                // inform the stream reader that we need a new drain rate
258
                control.drain_rate_requester.notify();
259

            
260
                Ok(Some(Xoff::new(FlowCtrlVersion::V0)))
261
            }
262
        }
263
64
    }
264
}
265

            
266
/// Control state for XON/XOFF flow control.
267
#[derive(Debug)]
268
struct XonXoffControl {
269
    /// How we communicate rate limit updates to the
270
    /// [`DataWriter`](crate::stream::data::DataWriter).
271
    rate_limit_updater: watch::Sender<StreamRateLimit>,
272
    /// How we communicate requests for new drain rate updates to the
273
    /// [`XonXoffReader`](crate::stream::xon_xoff::XonXoffReader).
274
    drain_rate_requester: NotifySender<DrainRateRequest>,
275
    /// The last rate limit we sent.
276
    last_sent_xon_xoff: Option<LastSentXonXoff>,
277
}
278

            
279
/// The last XON/XOFF message that we sent.
280
#[derive(Debug)]
281
enum LastSentXonXoff {
282
    /// XON message with a rate.
283
    // TODO: I'm expecting that we'll want the `XonKbpsEwma` in the future.
284
    // If that doesn't end up being the case, then we should remove it.
285
    #[expect(dead_code)]
286
    Xon(XonKbpsEwma),
287
    /// XOFF message.
288
    Xoff,
289
}
290

            
291
/// A newtype wrapper for a tor stream rate limit that makes the units explicit.
292
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
293
pub(crate) struct StreamRateLimit {
294
    /// The rate in bytes/s.
295
    rate: u64,
296
}
297

            
298
impl StreamRateLimit {
299
    /// A maximum rate limit.
300
    pub(crate) const MAX: Self = Self::new_bytes_per_sec(u64::MAX);
301

            
302
    /// A rate limit of 0.
303
    pub(crate) const ZERO: Self = Self::new_bytes_per_sec(0);
304

            
305
    /// A new [`StreamRateLimit`] with `rate` bytes/s.
306
    pub(crate) const fn new_bytes_per_sec(rate: u64) -> Self {
307
        Self { rate }
308
    }
309

            
310
    /// The rate in bytes/s.
311
128
    pub(crate) const fn bytes_per_sec(&self) -> u64 {
312
128
        self.rate
313
128
    }
314
}
315

            
316
impl std::fmt::Display for StreamRateLimit {
317
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
318
        write!(f, "{} bytes/s", self.rate)
319
    }
320
}
321

            
322
/// A marker type for a [`NotifySender`] indicating that notifications are for new drain rate
323
/// requests.
324
#[derive(Debug)]
325
pub(crate) struct DrainRateRequest;