tor_cell/relaycell/
flow_ctrl.rs

1//! Cells for flow control (excluding "sendme" cells).
2
3use std::num::NonZero;
4
5use derive_deftly::Deftly;
6use tor_bytes::{EncodeResult, Error, Reader, Writer};
7use tor_memquota::derive_deftly_template_HasMemoryCost;
8
9use crate::relaycell::msg::Body;
10
11/// An `XON` relay message.
12#[derive(Clone, Debug, Deftly)]
13#[derive_deftly(HasMemoryCost)]
14pub struct Xon {
15    /// Cell `version` field.
16    version: FlowCtrlVersion,
17    /// Cell `kbps_ewma` field.
18    kbps_ewma: XonKbpsEwma,
19}
20
21/// An `XOFF` relay message.
22#[derive(Clone, Debug, Deftly)]
23#[derive_deftly(HasMemoryCost)]
24pub struct Xoff {
25    /// Cell `version` field.
26    version: FlowCtrlVersion,
27}
28
29impl Xon {
30    /// Construct a new [`Xon`] cell.
31    pub fn new(version: FlowCtrlVersion, kbps_ewma: XonKbpsEwma) -> Self {
32        Self { version, kbps_ewma }
33    }
34
35    /// Return the version.
36    pub fn version(&self) -> FlowCtrlVersion {
37        self.version
38    }
39
40    /// Return the rate limit in kbps.
41    pub fn kbps_ewma(&self) -> XonKbpsEwma {
42        self.kbps_ewma
43    }
44}
45
46impl Body for Xon {
47    fn decode_from_reader(r: &mut Reader<'_>) -> tor_bytes::Result<Self> {
48        let version = r.take_u8()?;
49
50        let version = match FlowCtrlVersion::new(version) {
51            Ok(x) => x,
52            Err(UnrecognizedVersionError) => {
53                return Err(Error::InvalidMessage("Unrecognized XON version.".into()));
54            }
55        };
56
57        let kbps_ewma = XonKbpsEwma::decode(r.take_u32()?);
58
59        Ok(Self::new(version, kbps_ewma))
60    }
61
62    fn encode_onto<W: Writer + ?Sized>(self, w: &mut W) -> EncodeResult<()> {
63        w.write_u8(*self.version);
64        w.write_u32(self.kbps_ewma.encode());
65        Ok(())
66    }
67}
68
69impl Xoff {
70    /// Construct a new [`Xoff`] cell.
71    pub fn new(version: FlowCtrlVersion) -> Self {
72        Self { version }
73    }
74
75    /// Return the version.
76    pub fn version(&self) -> FlowCtrlVersion {
77        self.version
78    }
79}
80
81impl Body for Xoff {
82    fn decode_from_reader(r: &mut Reader<'_>) -> tor_bytes::Result<Self> {
83        let version = r.take_u8()?;
84
85        let version = match FlowCtrlVersion::new(version) {
86            Ok(x) => x,
87            Err(UnrecognizedVersionError) => {
88                return Err(Error::InvalidMessage("Unrecognized XOFF version.".into()));
89            }
90        };
91
92        Ok(Self::new(version))
93    }
94
95    fn encode_onto<W: Writer + ?Sized>(self, w: &mut W) -> EncodeResult<()> {
96        w.write_u8(*self.version);
97        Ok(())
98    }
99}
100
101/// A recognized flow control version.
102#[derive(Copy, Clone, Debug, Deftly)]
103#[derive_deftly(HasMemoryCost)]
104pub struct FlowCtrlVersion(u8);
105
106impl FlowCtrlVersion {
107    /// Version 0, which is currently the only known version.
108    pub const V0: Self = Self(0);
109
110    /// If `version` is a recognized XON/XOFF version, returns a new [`FlowCtrlVersion`].
111    pub const fn new(version: u8) -> Result<Self, UnrecognizedVersionError> {
112        if version != 0 {
113            return Err(UnrecognizedVersionError);
114        }
115
116        Ok(Self(version))
117    }
118}
119
120impl TryFrom<u8> for FlowCtrlVersion {
121    type Error = UnrecognizedVersionError;
122
123    fn try_from(x: u8) -> Result<Self, Self::Error> {
124        Self::new(x)
125    }
126}
127
128impl std::ops::Deref for FlowCtrlVersion {
129    type Target = u8;
130
131    fn deref(&self) -> &Self::Target {
132        &self.0
133    }
134}
135
136/// The XON/XOFF cell version was not recognized.
137#[derive(Clone, Debug)]
138#[non_exhaustive]
139pub struct UnrecognizedVersionError;
140
141/// The `kbps_ewma` field of an XON cell.
142#[derive(Copy, Clone, Debug, Deftly)]
143#[derive_deftly(HasMemoryCost)]
144#[allow(clippy::exhaustive_enums)]
145pub enum XonKbpsEwma {
146    /// Stream is rate limited to the value in kbps.
147    Limited(NonZero<u32>),
148    /// Stream is not rate limited.
149    Unlimited,
150}
151
152impl XonKbpsEwma {
153    /// Decode the `kbps_ewma` field of an XON cell.
154    fn decode(kbps_ewma: u32) -> Self {
155        // prop-324:
156        // > In `xon_cell`, a zero value for `kbps_ewma` means that the stream's rate is unlimited.
157        match NonZero::new(kbps_ewma) {
158            Some(x) => Self::Limited(x),
159            None => Self::Unlimited,
160        }
161    }
162
163    /// Encode as the `kbps_ewma` field of an XON cell.
164    fn encode(&self) -> u32 {
165        // prop-324:
166        // > In `xon_cell`, a zero value for `kbps_ewma` means that the stream's rate is unlimited.
167        match self {
168            Self::Limited(x) => x.get(),
169            Self::Unlimited => 0,
170        }
171    }
172}