tor_cell/relaycell/
flow_ctrl.rs

1//! Cells for flow control (excluding "sendme" cells).
2
3use std::num::NonZero;
4
5use derive_deftly::Deftly;
6use tor_bytes::{EncodeResult, Error, Reader, Writer};
7use tor_memquota::derive_deftly_template_HasMemoryCost;
8
9use crate::relaycell::msg::Body;
10
11/// An `XON` relay message.
12#[derive(Clone, Debug, Deftly)]
13#[derive_deftly(HasMemoryCost)]
14pub struct Xon {
15    /// Cell `version` field.
16    version: FlowCtrlVersion,
17    /// Cell `kbps_ewma` field.
18    kbps_ewma: XonKbpsEwma,
19}
20
21/// An `XOFF` relay message.
22#[derive(Clone, Debug, Deftly)]
23#[derive_deftly(HasMemoryCost)]
24pub struct Xoff {
25    /// Cell `version` field.
26    version: FlowCtrlVersion,
27}
28
29impl Xon {
30    /// Construct a new [`Xon`] cell.
31    pub fn new(version: FlowCtrlVersion, kbps_ewma: XonKbpsEwma) -> Self {
32        Self { version, kbps_ewma }
33    }
34
35    /// Return the version.
36    pub fn version(&self) -> FlowCtrlVersion {
37        self.version
38    }
39
40    /// Return the rate limit in kbps.
41    pub fn kbps_ewma(&self) -> XonKbpsEwma {
42        self.kbps_ewma
43    }
44}
45
46impl Body for Xon {
47    fn decode_from_reader(r: &mut Reader<'_>) -> tor_bytes::Result<Self> {
48        let version = r.take_u8()?;
49
50        let version = match version.try_into() {
51            Ok(x) => x,
52            Err(UnrecognizedVersionError) => {
53                return Err(Error::InvalidMessage("Unrecognized XON version.".into()));
54            }
55        };
56
57        let kbps_ewma = XonKbpsEwma::decode(r.take_u32()?);
58
59        Ok(Self::new(version, kbps_ewma))
60    }
61
62    fn encode_onto<W: Writer + ?Sized>(self, w: &mut W) -> EncodeResult<()> {
63        w.write_u8(*self.version);
64        w.write_u32(self.kbps_ewma.encode());
65        Ok(())
66    }
67}
68
69impl Xoff {
70    /// Construct a new [`Xoff`] cell.
71    pub fn new(version: FlowCtrlVersion) -> Self {
72        Self { version }
73    }
74
75    /// Return the version.
76    pub fn version(&self) -> FlowCtrlVersion {
77        self.version
78    }
79}
80
81impl Body for Xoff {
82    fn decode_from_reader(r: &mut Reader<'_>) -> tor_bytes::Result<Self> {
83        let version = r.take_u8()?;
84
85        let version = match version.try_into() {
86            Ok(x) => x,
87            Err(UnrecognizedVersionError) => {
88                return Err(Error::InvalidMessage("Unrecognized XOFF version.".into()));
89            }
90        };
91
92        Ok(Self::new(version))
93    }
94
95    fn encode_onto<W: Writer + ?Sized>(self, w: &mut W) -> EncodeResult<()> {
96        w.write_u8(*self.version);
97        Ok(())
98    }
99}
100
101/// A recognized flow control version.
102#[derive(Copy, Clone, Debug, Deftly)]
103#[derive_deftly(HasMemoryCost)]
104pub struct FlowCtrlVersion(u8);
105
106impl TryFrom<u8> for FlowCtrlVersion {
107    type Error = UnrecognizedVersionError;
108
109    fn try_from(x: u8) -> Result<Self, Self::Error> {
110        if x != 0 {
111            return Err(UnrecognizedVersionError);
112        }
113
114        Ok(Self(x))
115    }
116}
117
118impl std::ops::Deref for FlowCtrlVersion {
119    type Target = u8;
120
121    fn deref(&self) -> &Self::Target {
122        &self.0
123    }
124}
125
126/// The XON/XOFF cell version was not recognized.
127#[derive(Clone, Debug)]
128#[non_exhaustive]
129pub struct UnrecognizedVersionError;
130
131/// The `kbps_ewma` field of an XON cell.
132#[derive(Copy, Clone, Debug, Deftly)]
133#[derive_deftly(HasMemoryCost)]
134#[allow(clippy::exhaustive_enums)]
135pub enum XonKbpsEwma {
136    /// Stream is rate limited to the value in kbps.
137    Limited(NonZero<u32>),
138    /// Stream is not rate limited.
139    Unlimited,
140}
141
142impl XonKbpsEwma {
143    /// Decode the `kbps_ewma` field of an XON cell.
144    fn decode(kbps_ewma: u32) -> Self {
145        // prop-324:
146        // > In `xon_cell`, a zero value for `kbps_ewma` means that the stream's rate is unlimited.
147        match NonZero::new(kbps_ewma) {
148            Some(x) => Self::Limited(x),
149            None => Self::Unlimited,
150        }
151    }
152
153    /// Encode as the `kbps_ewma` field of an XON cell.
154    fn encode(&self) -> u32 {
155        // prop-324:
156        // > In `xon_cell`, a zero value for `kbps_ewma` means that the stream's rate is unlimited.
157        match self {
158            Self::Limited(x) => x.get(),
159            Self::Unlimited => 0,
160        }
161    }
162}