1//! Code for implementing flow control (stream-level).
23use postage::watch;
4#[cfg(feature = "flowctl-cc")]
5use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
6use tor_cell::relaycell::msg::Sendme;
7use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
89use crate::congestion::sendme;
10use crate::{Error, Result};
1112/// Private internals of [`StreamSendFlowControl`].
13#[derive(Debug)]
14enum StreamSendFlowControlEnum {
15/// "legacy" sendme-window-based flow control.
16WindowBased(sendme::StreamSendWindow),
17/// XON/XOFF flow control.
18#[cfg(feature = "flowctl-cc")]
19XonXoffBased(XonXoffControl),
20}
2122/// Manages outgoing flow control for a stream.
23#[derive(Debug)]
24pub(crate) struct StreamSendFlowControl {
25/// Private internal enum.
26e: StreamSendFlowControlEnum,
27}
2829impl StreamSendFlowControl {
30/// Returns a new sendme-window-based [`StreamSendFlowControl`].
31// TODO: Maybe take the raw u16 and create StreamSendWindow ourselves?
32 // Unclear whether we need or want to support creating this object from a
33 // preexisting StreamSendWindow.
34pub(crate) fn new_window_based(window: sendme::StreamSendWindow) -> Self {
35Self {
36 e: StreamSendFlowControlEnum::WindowBased(window),
37 }
38 }
3940/// Returns a new xon/xoff-based [`StreamSendFlowControl`].
41#[cfg(feature = "flowctl-cc")]
42pub(crate) fn new_xon_xoff_based(rate_limit_updater: watch::Sender<StreamRateLimit>) -> Self {
43Self {
44 e: StreamSendFlowControlEnum::XonXoffBased(XonXoffControl { rate_limit_updater }),
45 }
46 }
4748/// Whether this stream is ready to send `msg`.
49pub(crate) fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
50match &self.e {
51 StreamSendFlowControlEnum::WindowBased(w) => {
52 !sendme::cmd_counts_towards_windows(msg.cmd()) || w.window() > 0
53}
54#[cfg(feature = "flowctl-cc")]
55StreamSendFlowControlEnum::XonXoffBased(_) => {
56// we perform rate-limiting in the `DataWriter`,
57 // so we send any messages that made it past the `DataWriter`
58true
59}
60 }
61 }
6263/// Take capacity to send `msg`. If there's insufficient capacity, returns
64 /// an error.
65// TODO: Consider having this method wrap the message in a type that
66 // "proves" we've applied flow control. This would make it easier to apply
67 // flow control earlier, e.g. in `OpenStreamEntStream`, without introducing
68 // ambiguity in the sending function as to whether flow control has already
69 // been applied or not.
70pub(crate) fn take_capacity_to_send<M: RelayMsg>(&mut self, msg: &M) -> Result<()> {
71match &mut self.e {
72 StreamSendFlowControlEnum::WindowBased(w) => {
73if sendme::cmd_counts_towards_windows(msg.cmd()) {
74 w.take().map(|_| ())
75 } else {
76// TODO: Maybe make this an error?
77 // Ideally caller would have checked this already.
78Ok(())
79 }
80 }
81#[cfg(feature = "flowctl-cc")]
82StreamSendFlowControlEnum::XonXoffBased(_) => {
83// xon/xoff flow control doesn't have "capacity";
84 // the capacity is effectively controlled by the congestion control
85Ok(())
86 }
87 }
88 }
8990/// Handle an incoming sendme.
91 ///
92 /// On success, return the number of cells left in the window.
93 ///
94 /// On failure, return an error: the caller should close the stream or
95 /// circuit with a protocol error.
96 ///
97 /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
98 /// correct type of flow control.
99pub(crate) fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
100match &mut self.e {
101 StreamSendFlowControlEnum::WindowBased(w) => {
102let _sendme = msg
103 .decode::<Sendme>()
104 .map_err(|e| {
105 Error::from_bytes_err(e, "failed to decode stream sendme message")
106 })?
107.into_msg();
108109 w.put()
110 }
111#[cfg(feature = "flowctl-cc")]
112StreamSendFlowControlEnum::XonXoffBased(_) => Err(Error::CircProto(
113"Stream level SENDME not allowed due to congestion control".into(),
114 )),
115 }
116 }
117118/// Handle an incoming XON message.
119 ///
120 /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
121 /// correct type of flow control.
122pub(crate) fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
123match &mut self.e {
124 StreamSendFlowControlEnum::WindowBased(_) => Err(Error::CircProto(
125"XON messages not allowed with window flow control".into(),
126 )),
127#[cfg(feature = "flowctl-cc")]
128StreamSendFlowControlEnum::XonXoffBased(control) => {
129let xon = msg
130 .decode::<Xon>()
131 .map_err(|e| Error::from_bytes_err(e, "failed to decode XON message"))?
132.into_msg();
133134// > Parties SHOULD treat XON or XOFF cells with unrecognized versions as a protocol
135 // > violation.
136if *xon.version() != 0 {
137return Err(Error::CircProto("Unrecognized XON version".into()));
138 }
139140let rate = match xon.kbps_ewma() {
141 XonKbpsEwma::Limited(rate_kbps) => {
142let rate_kbps = u64::from(rate_kbps.get());
143// convert from kbps to bytes/s
144StreamRateLimit::new_bytes_per_sec(rate_kbps * 1000 / 8)
145 }
146 XonKbpsEwma::Unlimited => StreamRateLimit::MAX,
147 };
148149*control.rate_limit_updater.borrow_mut() = rate;
150Ok(())
151 }
152 }
153 }
154155/// Handle an incoming XOFF message.
156 ///
157 /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
158 /// correct type of flow control.
159pub(crate) fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
160match &mut self.e {
161 StreamSendFlowControlEnum::WindowBased(_) => Err(Error::CircProto(
162"XOFF messages not allowed with window flow control".into(),
163 )),
164#[cfg(feature = "flowctl-cc")]
165StreamSendFlowControlEnum::XonXoffBased(control) => {
166let xoff = msg
167 .decode::<Xoff>()
168 .map_err(|e| Error::from_bytes_err(e, "failed to decode XOFF message"))?
169.into_msg();
170171// > Parties SHOULD treat XON or XOFF cells with unrecognized versions as a protocol
172 // > violation.
173if *xoff.version() != 0 {
174return Err(Error::CircProto("Unrecognized XOFF version".into()));
175 }
176177*control.rate_limit_updater.borrow_mut() = StreamRateLimit::ZERO;
178Ok(())
179 }
180 }
181 }
182}
183184/// Control state for XON/XOFF flow control.
185#[derive(Debug)]
186struct XonXoffControl {
187/// How we communicate rate limit updates to the
188 /// [`DataWriter`](crate::stream::data::DataWriter).
189rate_limit_updater: watch::Sender<StreamRateLimit>,
190}
191192/// A newtype wrapper for a tor stream rate limit that makes the units explicit.
193#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
194pub(crate) struct StreamRateLimit {
195/// The rate in bytes/s.
196rate: u64,
197}
198199impl StreamRateLimit {
200/// A maximum rate limit.
201pub(crate) const MAX: Self = Self::new_bytes_per_sec(u64::MAX);
202203/// A rate limit of 0.
204pub(crate) const ZERO: Self = Self::new_bytes_per_sec(0);
205206/// A new [`StreamRateLimit`] with `rate` bytes/s.
207pub(crate) const fn new_bytes_per_sec(rate: u64) -> Self {
208Self { rate }
209 }
210211/// The rate in bytes/s.
212pub(crate) const fn bytes_per_sec(&self) -> u64 {
213self.rate
214 }
215}
216217impl std::fmt::Display for StreamRateLimit {
218fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219write!(f, "{} bytes/s", self.rate)
220 }
221}