tor_proto/stream/flow_control.rs
1//! Code for implementing flow control (stream-level).
2
3use tor_cell::relaycell::RelayMsg;
4
5use crate::congestion::sendme;
6use crate::{Error, Result};
7
8/// Private internals of [`StreamSendFlowControl`].
9#[derive(Debug)]
10enum StreamSendFlowControlEnum {
11 /// "legacy" sendme-window-based flow control.
12 WindowBased(sendme::StreamSendWindow),
13 /// XON/XOFF flow control.
14 XonXoffBased,
15}
16
17/// Manages outgoing flow control for a stream.
18#[derive(Debug)]
19pub(crate) struct StreamSendFlowControl {
20 /// Private internal enum.
21 e: StreamSendFlowControlEnum,
22}
23
24impl StreamSendFlowControl {
25 /// Returns a new sendme-window-based [`StreamSendFlowControl`].
26 // TODO: Maybe take the raw u16 and create StreamSendWindow ourselves?
27 // Unclear whether we need or want to support creating this object from a
28 // preexisting StreamSendWindow.
29 pub(crate) fn new_window_based(window: sendme::StreamSendWindow) -> Self {
30 Self {
31 e: StreamSendFlowControlEnum::WindowBased(window),
32 }
33 }
34
35 /// Returns a new xon/xoff-based [`StreamSendFlowControl`].
36 ///
37 /// **NOTE:** This isn't actually implemented yet,
38 /// and is currently a no-op congestion control.
39 // TODO(#534): remove the note above
40 pub(crate) fn new_xon_xoff_based() -> Self {
41 Self {
42 e: StreamSendFlowControlEnum::XonXoffBased,
43 }
44 }
45
46 /// Whether this stream is ready to send `msg`.
47 pub(crate) fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
48 match &self.e {
49 StreamSendFlowControlEnum::WindowBased(w) => {
50 !sendme::cmd_counts_towards_windows(msg.cmd()) || w.window() > 0
51 }
52 StreamSendFlowControlEnum::XonXoffBased => {
53 // TODO(#534): xon-based will depend on number of bytes in the body of DATA messages
54 true
55 }
56 }
57 }
58
59 /// Take capacity to send `msg`. If there's insufficient capacity, returns
60 /// an error.
61 // TODO: Consider having this method wrap the message in a type that
62 // "proves" we've applied flow control. This would make it easier to apply
63 // flow control earlier, e.g. in `OpenStreamEntStream`, without introducing
64 // ambiguity in the sending function as to whether flow control has already
65 // been applied or not.
66 pub(crate) fn take_capacity_to_send<M: RelayMsg>(&mut self, msg: &M) -> Result<()> {
67 match &mut self.e {
68 StreamSendFlowControlEnum::WindowBased(w) => {
69 if sendme::cmd_counts_towards_windows(msg.cmd()) {
70 w.take().map(|_| ())
71 } else {
72 // TODO: Maybe make this an error?
73 // Ideally caller would have checked this already.
74 Ok(())
75 }
76 }
77 StreamSendFlowControlEnum::XonXoffBased => {
78 // TODO(#534): xon-based will update state based on number of bytes in the body of
79 // DATA messages
80 Ok(())
81 }
82 }
83 }
84
85 /// Handle an incoming sendme.
86 ///
87 /// On success, return the number of cells left in the window.
88 ///
89 /// On failure, return an error: the caller should close the stream or
90 /// circuit with a protocol error.
91 pub(crate) fn put_for_incoming_sendme(&mut self) -> Result<()> {
92 match &mut self.e {
93 StreamSendFlowControlEnum::WindowBased(w) => w.put(),
94 StreamSendFlowControlEnum::XonXoffBased => Err(Error::CircProto(
95 "Stream level SENDME not allowed due to congestion control".into(),
96 )),
97 }
98 }
99
100 // TODO(#534): Add methods for handling incoming xon, xoff.
101}