1
//! Cells for flow control (excluding "sendme" cells).
2

            
3
use std::num::NonZero;
4

            
5
use derive_deftly::Deftly;
6
use tor_bytes::{EncodeResult, Error, Reader, Writer};
7
use tor_memquota::derive_deftly_template_HasMemoryCost;
8

            
9
use crate::relaycell::msg::Body;
10

            
11
/// An `XON` relay message.
12
#[derive(Clone, Debug, Deftly)]
13
#[derive_deftly(HasMemoryCost)]
14
pub struct Xon {
15
    /// Cell `version` field.
16
    version: FlowCtrlVersion,
17
    /// Cell `kbps_ewma` field.
18
    kbps_ewma: XonKbpsEwma,
19
}
20

            
21
/// An `XOFF` relay message.
22
#[derive(Clone, Debug, Deftly)]
23
#[derive_deftly(HasMemoryCost)]
24
pub struct Xoff {
25
    /// Cell `version` field.
26
    version: FlowCtrlVersion,
27
}
28

            
29
impl Xon {
30
    /// Construct a new [`Xon`] cell.
31
    pub fn new(version: FlowCtrlVersion, kbps_ewma: XonKbpsEwma) -> Self {
32
        Self { version, kbps_ewma }
33
    }
34

            
35
    /// Return the version.
36
    pub fn version(&self) -> FlowCtrlVersion {
37
        self.version
38
    }
39

            
40
    /// Return the rate limit in kbps.
41
    pub fn kbps_ewma(&self) -> XonKbpsEwma {
42
        self.kbps_ewma
43
    }
44
}
45

            
46
impl Body for Xon {
47
    fn decode_from_reader(r: &mut Reader<'_>) -> tor_bytes::Result<Self> {
48
        let version = r.take_u8()?;
49

            
50
        let version = match FlowCtrlVersion::new(version) {
51
            Ok(x) => x,
52
            Err(UnrecognizedVersionError) => {
53
                return Err(Error::InvalidMessage("Unrecognized XON version.".into()));
54
            }
55
        };
56

            
57
        let kbps_ewma = XonKbpsEwma::decode(r.take_u32()?);
58

            
59
        Ok(Self::new(version, kbps_ewma))
60
    }
61

            
62
    fn encode_onto<W: Writer + ?Sized>(self, w: &mut W) -> EncodeResult<()> {
63
        w.write_u8(*self.version);
64
        w.write_u32(self.kbps_ewma.encode());
65
        Ok(())
66
    }
67
}
68

            
69
impl Xoff {
70
    /// Construct a new [`Xoff`] cell.
71
    pub fn new(version: FlowCtrlVersion) -> Self {
72
        Self { version }
73
    }
74

            
75
    /// Return the version.
76
    pub fn version(&self) -> FlowCtrlVersion {
77
        self.version
78
    }
79
}
80

            
81
impl Body for Xoff {
82
    fn decode_from_reader(r: &mut Reader<'_>) -> tor_bytes::Result<Self> {
83
        let version = r.take_u8()?;
84

            
85
        let version = match FlowCtrlVersion::new(version) {
86
            Ok(x) => x,
87
            Err(UnrecognizedVersionError) => {
88
                return Err(Error::InvalidMessage("Unrecognized XOFF version.".into()));
89
            }
90
        };
91

            
92
        Ok(Self::new(version))
93
    }
94

            
95
    fn encode_onto<W: Writer + ?Sized>(self, w: &mut W) -> EncodeResult<()> {
96
        w.write_u8(*self.version);
97
        Ok(())
98
    }
99
}
100

            
101
/// A recognized flow control version.
102
#[derive(Copy, Clone, Debug, Deftly)]
103
#[derive_deftly(HasMemoryCost)]
104
pub struct FlowCtrlVersion(u8);
105

            
106
impl FlowCtrlVersion {
107
    /// Version 0, which is currently the only known version.
108
    pub const V0: Self = Self(0);
109

            
110
    /// If `version` is a recognized XON/XOFF version, returns a new [`FlowCtrlVersion`].
111
    pub const fn new(version: u8) -> Result<Self, UnrecognizedVersionError> {
112
        if version != 0 {
113
            return Err(UnrecognizedVersionError);
114
        }
115

            
116
        Ok(Self(version))
117
    }
118
}
119

            
120
impl TryFrom<u8> for FlowCtrlVersion {
121
    type Error = UnrecognizedVersionError;
122

            
123
    fn try_from(x: u8) -> Result<Self, Self::Error> {
124
        Self::new(x)
125
    }
126
}
127

            
128
impl std::ops::Deref for FlowCtrlVersion {
129
    type Target = u8;
130

            
131
    fn deref(&self) -> &Self::Target {
132
        &self.0
133
    }
134
}
135

            
136
/// The XON/XOFF cell version was not recognized.
137
#[derive(Clone, Debug)]
138
#[non_exhaustive]
139
pub struct UnrecognizedVersionError;
140

            
141
/// The `kbps_ewma` field of an XON cell.
142
#[derive(Copy, Clone, Debug, Deftly)]
143
#[derive_deftly(HasMemoryCost)]
144
#[allow(clippy::exhaustive_enums)]
145
pub enum XonKbpsEwma {
146
    /// Stream is rate limited to the value in kbps.
147
    Limited(NonZero<u32>),
148
    /// Stream is not rate limited.
149
    Unlimited,
150
}
151

            
152
impl XonKbpsEwma {
153
    /// Decode the `kbps_ewma` field of an XON cell.
154
    fn decode(kbps_ewma: u32) -> Self {
155
        // prop-324:
156
        // > In `xon_cell`, a zero value for `kbps_ewma` means that the stream's rate is unlimited.
157
        match NonZero::new(kbps_ewma) {
158
            Some(x) => Self::Limited(x),
159
            None => Self::Unlimited,
160
        }
161
    }
162

            
163
    /// Encode as the `kbps_ewma` field of an XON cell.
164
    fn encode(&self) -> u32 {
165
        // prop-324:
166
        // > In `xon_cell`, a zero value for `kbps_ewma` means that the stream's rate is unlimited.
167
        match self {
168
            Self::Limited(x) => x.get(),
169
            Self::Unlimited => 0,
170
        }
171
    }
172
}
173

            
174
impl std::fmt::Display for XonKbpsEwma {
175
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176
        match self {
177
            Self::Limited(rate) => write!(f, "{rate} kbps"),
178
            Self::Unlimited => write!(f, "unlimited"),
179
        }
180
    }
181
}