1
//! Code for implementing flow control (stream-level).
2

            
3
use tor_cell::relaycell::RelayMsg;
4

            
5
use crate::congestion::sendme;
6
use crate::{Error, Result};
7

            
8
/// Private internals of [`StreamSendFlowControl`].
9
#[derive(Debug)]
10
enum StreamSendFlowControlEnum {
11
    /// "legacy" sendme-window-based flow control.
12
    WindowBased(sendme::StreamSendWindow),
13
    /// XON/XOFF flow control.
14
    XonXoffBased,
15
}
16

            
17
/// Manages outgoing flow control for a stream.
18
#[derive(Debug)]
19
pub(crate) struct StreamSendFlowControl {
20
    /// Private internal enum.
21
    e: StreamSendFlowControlEnum,
22
}
23

            
24
impl StreamSendFlowControl {
25
    /// Returns a new sendme-window-based [`StreamSendFlowControl`].
26
    // TODO: Maybe take the raw u16 and create StreamSendWindow ourselves?
27
    // Unclear whether we need or want to support creating this object from a
28
    // preexisting StreamSendWindow.
29
346
    pub(crate) fn new_window_based(window: sendme::StreamSendWindow) -> Self {
30
346
        Self {
31
346
            e: StreamSendFlowControlEnum::WindowBased(window),
32
346
        }
33
346
    }
34

            
35
    /// Returns a new xon/xoff-based [`StreamSendFlowControl`].
36
    ///
37
    /// **NOTE:** This isn't actually implemented yet,
38
    /// and is currently a no-op congestion control.
39
    // TODO(#534): remove the note above
40
    pub(crate) fn new_xon_xoff_based() -> Self {
41
        Self {
42
            e: StreamSendFlowControlEnum::XonXoffBased,
43
        }
44
    }
45

            
46
    /// Whether this stream is ready to send `msg`.
47
13728
    pub(crate) fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
48
13728
        match &self.e {
49
13728
            StreamSendFlowControlEnum::WindowBased(w) => {
50
13728
                !sendme::cmd_counts_towards_windows(msg.cmd()) || w.window() > 0
51
            }
52
            StreamSendFlowControlEnum::XonXoffBased => {
53
                // TODO(#534): xon-based will depend on number of bytes in the body of DATA messages
54
                true
55
            }
56
        }
57
13728
    }
58

            
59
    /// Take capacity to send `msg`. If there's insufficient capacity, returns
60
    /// an error.
61
    // TODO: Consider having this method wrap the message in a type that
62
    // "proves" we've applied flow control. This would make it easier to apply
63
    // flow control earlier, e.g. in `OpenStreamEntStream`, without introducing
64
    // ambiguity in the sending function as to whether flow control has already
65
    // been applied or not.
66
2728
    pub(crate) fn take_capacity_to_send<M: RelayMsg>(&mut self, msg: &M) -> Result<()> {
67
2728
        match &mut self.e {
68
2728
            StreamSendFlowControlEnum::WindowBased(w) => {
69
2728
                if sendme::cmd_counts_towards_windows(msg.cmd()) {
70
2728
                    w.take().map(|_| ())
71
                } else {
72
                    // TODO: Maybe make this an error?
73
                    // Ideally caller would have checked this already.
74
                    Ok(())
75
                }
76
            }
77
            StreamSendFlowControlEnum::XonXoffBased => {
78
                // TODO(#534): xon-based will update state based on number of bytes in the body of
79
                // DATA messages
80
                Ok(())
81
            }
82
        }
83
2728
    }
84

            
85
    /// Handle an incoming sendme.
86
    ///
87
    /// On success, return the number of cells left in the window.
88
    ///
89
    /// On failure, return an error: the caller should close the stream or
90
    /// circuit with a protocol error.
91
8
    pub(crate) fn put_for_incoming_sendme(&mut self) -> Result<()> {
92
8
        match &mut self.e {
93
8
            StreamSendFlowControlEnum::WindowBased(w) => w.put(),
94
            StreamSendFlowControlEnum::XonXoffBased => Err(Error::CircProto(
95
                "Stream level SENDME not allowed due to congestion control".into(),
96
            )),
97
        }
98
8
    }
99

            
100
    // TODO(#534): Add methods for handling incoming xon, xoff.
101
}