1//! Management for flow control windows.
2//!
3//! Tor maintains a separate windows on circuits and on streams.
4//! These are controlled by SENDME cells, which (confusingly) are
5//! applied either at the circuit or the stream level depending on
6//! whether they have a stream ID set.
7//!
8//! Circuit sendmes are _authenticated_: they include a cryptographic
9//! tag generated by the cryptography layer. This tag proves that the
10//! other side of the circuit really has read all of the data that it's
11//! acknowledging.
1213use std::collections::VecDeque;
1415use tor_cell::relaycell::RelayCmd;
16use tor_cell::relaycell::UnparsedRelayMsg;
17use tor_error::internal;
1819use crate::{Error, Result};
2021/// A circuit's send window.
22pub(crate) type CircSendWindow = SendWindow<CircParams>;
23/// A stream's send window.
24pub(crate) type StreamSendWindow = SendWindow<StreamParams>;
2526/// A circuit's receive window.
27pub(crate) type CircRecvWindow = RecvWindow<CircParams>;
28/// A stream's receive window.
29pub(crate) type StreamRecvWindow = RecvWindow<StreamParams>;
3031/// Tracks how many cells we can safely send on a circuit or stream.
32///
33/// Additionally, remembers a list of tags that could be used to
34/// acknowledge the cells we have already sent, so we know it's safe
35/// to send more.
36#[derive(Clone, Debug)]
37pub(crate) struct SendWindow<P>
38where
39P: WindowParams,
40{
41/// Current value for this window
42window: u16,
43/// Marker type to tell the compiler that the P type is used.
44_dummy: std::marker::PhantomData<P>,
45}
4647/// Helper: parametrizes a window to determine its maximum and its increment.
48pub(crate) trait WindowParams {
49/// Largest allowable value for this window.
50#[allow(dead_code)] // TODO #1383 failure to ever use this is probably a bug
51fn maximum() -> u16;
52/// Increment for this window.
53fn increment() -> u16;
54/// The default starting value.
55fn start() -> u16;
56}
5758/// Parameters used for SENDME windows on circuits: limit at 1000 cells,
59/// and each SENDME adjusts by 100.
60#[derive(Clone, Debug)]
61pub(crate) struct CircParams;
62impl WindowParams for CircParams {
63fn maximum() -> u16 {
641000
65}
66fn increment() -> u16 {
67100
68}
69fn start() -> u16 {
701000
71}
72}
7374/// Parameters used for SENDME windows on streams: limit at 500 cells,
75/// and each SENDME adjusts by 50.
76#[derive(Clone, Debug)]
77pub(crate) struct StreamParams;
78impl WindowParams for StreamParams {
79fn maximum() -> u16 {
80500
81}
82fn increment() -> u16 {
8350
84}
85fn start() -> u16 {
86500
87}
88}
8990/// Object used to validate SENDMEs as in managing the authenticated tag and verifying it.
91#[derive(Clone, Debug)]
92pub(crate) struct SendmeValidator<T>
93where
94T: PartialEq + Eq + Clone,
95{
96/// Tag values that incoming "SENDME" messages need to match in order
97 /// for us to send more data.
98tags: VecDeque<T>,
99}
100101impl<T> SendmeValidator<T>
102where
103T: PartialEq + Eq + Clone,
104{
105/// Constructor
106pub(crate) fn new() -> Self {
107Self {
108 tags: VecDeque::new(),
109 }
110 }
111112/// Record a SENDME tag for future validation once we receive it.
113pub(crate) fn record<U>(&mut self, tag: &U)
114where
115U: Clone + Into<T>,
116 {
117self.tags.push_back(tag.clone().into());
118 }
119120/// Validate a received tag (if any). A mismatch leads to a protocol violation and the circuit
121 /// MUST be closed.
122pub(crate) fn validate<U>(&mut self, tag: Option<U>) -> Result<()>
123where
124T: PartialEq<U>,
125 {
126match (self.tags.front(), tag) {
127 (Some(t), Some(tag)) if t == &tag => {} // this is the right tag.
128(Some(_), None) => {} // didn't need a tag.
129(Some(_), Some(_)) => {
130return Err(Error::CircProto("Mismatched tag on circuit SENDME".into()));
131 }
132 (None, _) => {
133return Err(Error::CircProto(
134"Received a SENDME when none was expected".into(),
135 ));
136 }
137 }
138self.tags.pop_front();
139Ok(())
140 }
141142#[cfg(test)]
143pub(crate) fn expected_tags(&self) -> Vec<T> {
144self.tags.iter().map(Clone::clone).collect()
145 }
146}
147148impl<P> SendWindow<P>
149where
150P: WindowParams,
151{
152/// Construct a new SendWindow.
153pub(crate) fn new(window: u16) -> SendWindow<P> {
154 SendWindow {
155 window,
156 _dummy: std::marker::PhantomData,
157 }
158 }
159160/// Return true iff the SENDME tag should be recorded.
161pub(crate) fn should_record_tag(&self) -> bool {
162self.window % P::increment() == 0
163}
164165/// Remove one item from this window (since we've sent a cell).
166 /// If the window was empty, returns an error.
167pub(crate) fn take(&mut self) -> Result<()> {
168self.window = self.window.checked_sub(1).ok_or(Error::CircProto(
169"Called SendWindow::take() on empty SendWindow".into(),
170 ))?;
171Ok(())
172 }
173174/// Handle an incoming sendme.
175 ///
176 /// On failure, return an error: the caller must close the circuit due to a protocol violation.
177#[must_use = "didn't check whether SENDME was expected."]
178pub(crate) fn put(&mut self) -> Result<()> {
179// Overflow check.
180let new_window = self
181.window
182 .checked_add(P::increment())
183 .ok_or(Error::from(internal!("Overflow on SENDME window")))?;
184// Make sure we never go above our maximum else this wasn't expected.
185if new_window > P::maximum() {
186return Err(Error::CircProto("Unexpected stream SENDME".into()));
187 }
188self.window = new_window;
189Ok(())
190 }
191192/// Return the current send window value.
193pub(crate) fn window(&self) -> u16 {
194self.window
195 }
196}
197198/// Structure to track when we need to send SENDME cells for incoming data.
199#[derive(Clone, Debug)]
200pub(crate) struct RecvWindow<P: WindowParams> {
201/// Number of cells that we'd be willing to receive on this window
202 /// before sending a SENDME.
203window: u16,
204/// Marker type to tell the compiler that the P type is used.
205_dummy: std::marker::PhantomData<P>,
206}
207208impl<P: WindowParams> RecvWindow<P> {
209/// Create a new RecvWindow.
210pub(crate) fn new(window: u16) -> RecvWindow<P> {
211 RecvWindow {
212 window,
213 _dummy: std::marker::PhantomData,
214 }
215 }
216217/// Called when we've just received a cell; return true if we need to send
218 /// a sendme, and false otherwise.
219 ///
220 /// Returns None if we should not have sent the cell, and we just
221 /// violated the window.
222pub(crate) fn take(&mut self) -> Result<bool> {
223let v = self.window.checked_sub(1);
224if let Some(x) = v {
225self.window = x;
226// TODO: same note as in SendWindow.take(). I don't know if
227 // this truly matches the spec, but tor accepts it.
228Ok(x % P::increment() == 0)
229 } else {
230Err(Error::CircProto(
231"Received a data cell in violation of a window".into(),
232 ))
233 }
234 }
235236/// Reduce this window by `n`; give an error if this is not possible.
237pub(crate) fn decrement_n(&mut self, n: u16) -> crate::Result<()> {
238self.window = self.window.checked_sub(n).ok_or(Error::CircProto(
239"Received too many cells on a stream".into(),
240 ))?;
241Ok(())
242 }
243244/// Called when we've just sent a SENDME.
245pub(crate) fn put(&mut self) {
246self.window = self
247.window
248 .checked_add(P::increment())
249 .expect("Overflow detected while attempting to increment window");
250 }
251}
252253/// Return true if this message type is counted by flow-control windows.
254pub(crate) fn cmd_counts_towards_windows(cmd: RelayCmd) -> bool {
255 cmd == RelayCmd::DATA
256}
257258/// Return true if this message is counted by flow-control windows.
259#[cfg(test)]
260pub(crate) fn msg_counts_towards_windows(msg: &tor_cell::relaycell::msg::AnyRelayMsg) -> bool {
261use tor_cell::relaycell::RelayMsg;
262 cmd_counts_towards_windows(msg.cmd())
263}
264265/// Return true if this message is counted by flow-control windows.
266pub(crate) fn cell_counts_towards_windows(cell: &UnparsedRelayMsg) -> bool {
267 cmd_counts_towards_windows(cell.cmd())
268}
269270#[cfg(test)]
271mod test {
272// @@ begin test lint list maintained by maint/add_warning @@
273#![allow(clippy::bool_assert_comparison)]
274 #![allow(clippy::clone_on_copy)]
275 #![allow(clippy::dbg_macro)]
276 #![allow(clippy::mixed_attributes_style)]
277 #![allow(clippy::print_stderr)]
278 #![allow(clippy::print_stdout)]
279 #![allow(clippy::single_char_pattern)]
280 #![allow(clippy::unwrap_used)]
281 #![allow(clippy::unchecked_duration_subtraction)]
282 #![allow(clippy::useless_vec)]
283 #![allow(clippy::needless_pass_by_value)]
284//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
285use super::*;
286use tor_basic_utils::test_rng::testing_rng;
287use tor_cell::relaycell::{msg, AnyRelayMsgOuter, RelayCellFormat, StreamId};
288289#[test]
290fn what_counts() {
291let mut rng = testing_rng();
292let fmt = RelayCellFormat::V0;
293let m = msg::Begin::new("www.torproject.org", 443, 0)
294 .unwrap()
295 .into();
296assert!(!msg_counts_towards_windows(&m));
297assert!(!cell_counts_towards_windows(
298&UnparsedRelayMsg::from_singleton_body(
299 RelayCellFormat::V0,
300 AnyRelayMsgOuter::new(StreamId::new(77), m)
301 .encode(fmt, &mut rng)
302 .unwrap()
303 )
304 .unwrap()
305 ));
306307let m = msg::Data::new(&b"Education is not a prerequisite to political control-political control is the cause of popular education."[..]).unwrap().into(); // Du Bois
308assert!(msg_counts_towards_windows(&m));
309assert!(cell_counts_towards_windows(
310&UnparsedRelayMsg::from_singleton_body(
311 RelayCellFormat::V0,
312 AnyRelayMsgOuter::new(StreamId::new(128), m)
313 .encode(fmt, &mut rng)
314 .unwrap()
315 )
316 .unwrap()
317 ));
318 }
319320#[test]
321fn recvwindow() {
322let mut w: RecvWindow<StreamParams> = RecvWindow::new(500);
323324for _ in 0..49 {
325assert!(!w.take().unwrap());
326 }
327assert!(w.take().unwrap());
328assert_eq!(w.window, 450);
329330assert!(w.decrement_n(123).is_ok());
331assert_eq!(w.window, 327);
332333 w.put();
334assert_eq!(w.window, 377);
335336// failing decrement.
337assert!(w.decrement_n(400).is_err());
338// failing take.
339assert!(w.decrement_n(377).is_ok());
340assert!(w.take().is_err());
341 }
342343fn new_sendwindow() -> SendWindow<CircParams> {
344 SendWindow::new(1000)
345 }
346347#[test]
348fn sendwindow_basic() -> Result<()> {
349let mut w = new_sendwindow();
350351 w.take()?;
352assert_eq!(w.window(), 999);
353for _ in 0_usize..98 {
354 w.take()?;
355 }
356assert_eq!(w.window(), 901);
357358 w.take()?;
359assert_eq!(w.window(), 900);
360361 w.take()?;
362assert_eq!(w.window(), 899);
363364// Try putting a good tag.
365w.put()?;
366assert_eq!(w.window(), 999);
367368for _ in 0_usize..300 {
369 w.take()?;
370 }
371372// Put without a tag.
373w.put()?;
374assert_eq!(w.window(), 799);
375376Ok(())
377 }
378379#[test]
380fn sendwindow_erroring() -> Result<()> {
381let mut w = new_sendwindow();
382for _ in 0_usize..1000 {
383 w.take()?;
384 }
385assert_eq!(w.window(), 0);
386387let ready = w.take();
388assert!(ready.is_err());
389Ok(())
390 }
391}