tor_proto/stream/
flow_control.rs

1//! Code for implementing flow control (stream-level).
2
3use tor_cell::relaycell::RelayMsg;
4
5use crate::congestion::sendme;
6use crate::{Error, Result};
7
8/// Private internals of [`StreamSendFlowControl`].
9#[derive(Debug)]
10enum StreamSendFlowControlEnum {
11    /// "legacy" sendme-window-based flow control.
12    WindowBased(sendme::StreamSendWindow),
13    /// XON/XOFF flow control.
14    XonXoffBased,
15}
16
17/// Manages outgoing flow control for a stream.
18#[derive(Debug)]
19pub(crate) struct StreamSendFlowControl {
20    /// Private internal enum.
21    e: StreamSendFlowControlEnum,
22}
23
24impl StreamSendFlowControl {
25    /// Returns a new sendme-window-based [`StreamSendFlowControl`].
26    // TODO: Maybe take the raw u16 and create StreamSendWindow ourselves?
27    // Unclear whether we need or want to support creating this object from a
28    // preexisting StreamSendWindow.
29    pub(crate) fn new_window_based(window: sendme::StreamSendWindow) -> Self {
30        Self {
31            e: StreamSendFlowControlEnum::WindowBased(window),
32        }
33    }
34
35    /// Returns a new xon/xoff-based [`StreamSendFlowControl`].
36    ///
37    /// **NOTE:** This isn't actually implemented yet,
38    /// and is currently a no-op congestion control.
39    // TODO(#534): remove the note above
40    pub(crate) fn new_xon_xoff_based() -> Self {
41        Self {
42            e: StreamSendFlowControlEnum::XonXoffBased,
43        }
44    }
45
46    /// Whether this stream is ready to send `msg`.
47    pub(crate) fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
48        match &self.e {
49            StreamSendFlowControlEnum::WindowBased(w) => {
50                !sendme::cmd_counts_towards_windows(msg.cmd()) || w.window() > 0
51            }
52            StreamSendFlowControlEnum::XonXoffBased => {
53                // TODO(#534): xon-based will depend on number of bytes in the body of DATA messages
54                true
55            }
56        }
57    }
58
59    /// Take capacity to send `msg`. If there's insufficient capacity, returns
60    /// an error.
61    // TODO: Consider having this method wrap the message in a type that
62    // "proves" we've applied flow control. This would make it easier to apply
63    // flow control earlier, e.g. in `OpenStreamEntStream`, without introducing
64    // ambiguity in the sending function as to whether flow control has already
65    // been applied or not.
66    pub(crate) fn take_capacity_to_send<M: RelayMsg>(&mut self, msg: &M) -> Result<()> {
67        match &mut self.e {
68            StreamSendFlowControlEnum::WindowBased(w) => {
69                if sendme::cmd_counts_towards_windows(msg.cmd()) {
70                    w.take().map(|_| ())
71                } else {
72                    // TODO: Maybe make this an error?
73                    // Ideally caller would have checked this already.
74                    Ok(())
75                }
76            }
77            StreamSendFlowControlEnum::XonXoffBased => {
78                // TODO(#534): xon-based will update state based on number of bytes in the body of
79                // DATA messages
80                Ok(())
81            }
82        }
83    }
84
85    /// Handle an incoming sendme.
86    ///
87    /// On success, return the number of cells left in the window.
88    ///
89    /// On failure, return an error: the caller should close the stream or
90    /// circuit with a protocol error.
91    pub(crate) fn put_for_incoming_sendme(&mut self) -> Result<()> {
92        match &mut self.e {
93            StreamSendFlowControlEnum::WindowBased(w) => w.put(),
94            StreamSendFlowControlEnum::XonXoffBased => Err(Error::CircProto(
95                "Stream level SENDME not allowed due to congestion control".into(),
96            )),
97        }
98    }
99
100    // TODO(#534): Add methods for handling incoming xon, xoff.
101}