1
//! Management for flow control windows.
2
//!
3
//! Tor maintains a separate windows on circuits and on streams.
4
//! These are controlled by SENDME cells, which (confusingly) are
5
//! applied either at the circuit or the stream level depending on
6
//! whether they have a stream ID set.
7
//!
8
//! Circuit sendmes are _authenticated_: they include a cryptographic
9
//! tag generated by the cryptography layer.  This tag proves that the
10
//! other side of the circuit really has read all of the data that it's
11
//! acknowledging.
12

            
13
use std::collections::VecDeque;
14

            
15
use tor_cell::relaycell::RelayCmd;
16
use tor_cell::relaycell::UnparsedRelayMsg;
17
use tor_error::internal;
18

            
19
use crate::{Error, Result};
20

            
21
/// A circuit's send window.
22
pub(crate) type CircSendWindow = SendWindow<CircParams>;
23
/// A stream's send window.
24
pub(crate) type StreamSendWindow = SendWindow<StreamParams>;
25

            
26
/// A circuit's receive window.
27
pub(crate) type CircRecvWindow = RecvWindow<CircParams>;
28
/// A stream's receive window.
29
pub(crate) type StreamRecvWindow = RecvWindow<StreamParams>;
30

            
31
/// Tracks how many cells we can safely send on a circuit or stream.
32
///
33
/// Additionally, remembers a list of tags that could be used to
34
/// acknowledge the cells we have already sent, so we know it's safe
35
/// to send more.
36
#[derive(Clone, Debug)]
37
pub(crate) struct SendWindow<P>
38
where
39
    P: WindowParams,
40
{
41
    /// Current value for this window
42
    window: u16,
43
    /// Marker type to tell the compiler that the P type is used.
44
    _dummy: std::marker::PhantomData<P>,
45
}
46

            
47
/// Helper: parametrizes a window to determine its maximum and its increment.
48
pub(crate) trait WindowParams {
49
    /// Largest allowable value for this window.
50
    #[allow(dead_code)] // TODO #1383 failure to ever use this is probably a bug
51
    fn maximum() -> u16;
52
    /// Increment for this window.
53
    fn increment() -> u16;
54
    /// The default starting value.
55
    fn start() -> u16;
56
}
57

            
58
/// Parameters used for SENDME windows on circuits: limit at 1000 cells,
59
/// and each SENDME adjusts by 100.
60
#[derive(Clone, Debug)]
61
pub(crate) struct CircParams;
62
impl WindowParams for CircParams {
63
8
    fn maximum() -> u16 {
64
8
        1000
65
8
    }
66
2760
    fn increment() -> u16 {
67
2760
        100
68
2760
    }
69
400
    fn start() -> u16 {
70
400
        1000
71
400
    }
72
}
73

            
74
/// Parameters used for SENDME windows on streams: limit at 500 cells,
75
/// and each SENDME adjusts by 50.
76
#[derive(Clone, Debug)]
77
pub(crate) struct StreamParams;
78
impl WindowParams for StreamParams {
79
8
    fn maximum() -> u16 {
80
8
        500
81
8
    }
82
174
    fn increment() -> u16 {
83
174
        50
84
174
    }
85
    fn start() -> u16 {
86
        500
87
    }
88
}
89

            
90
/// Object used to validate SENDMEs as in managing the authenticated tag and verifying it.
91
#[derive(Clone, Debug)]
92
pub(crate) struct SendmeValidator<T>
93
where
94
    T: PartialEq + Eq + Clone,
95
{
96
    /// Tag values that incoming "SENDME" messages need to match in order
97
    /// for us to send more data.
98
    tags: VecDeque<T>,
99
}
100

            
101
impl<T> SendmeValidator<T>
102
where
103
    T: PartialEq + Eq + Clone,
104
{
105
    /// Constructor
106
408
    pub(crate) fn new() -> Self {
107
408
        Self {
108
408
            tags: VecDeque::new(),
109
408
        }
110
408
    }
111

            
112
    /// Record a SENDME tag for future validation once we receive it.
113
24
    pub(crate) fn record<U>(&mut self, tag: &U)
114
24
    where
115
24
        U: Clone + Into<T>,
116
24
    {
117
24
        self.tags.push_back(tag.clone().into());
118
24
    }
119

            
120
    /// Validate a received tag (if any). A mismatch leads to a protocol violation and the circuit
121
    /// MUST be closed.
122
8
    pub(crate) fn validate<U>(&mut self, tag: Option<U>) -> Result<()>
123
8
    where
124
8
        T: PartialEq<U>,
125
8
    {
126
8
        match (self.tags.front(), tag) {
127
8
            (Some(t), Some(tag)) if t == &tag => {} // this is the right tag.
128
            (Some(_), None) => {}                   // didn't need a tag.
129
            (Some(_), Some(_)) => {
130
4
                return Err(Error::CircProto("Mismatched tag on circuit SENDME".into()));
131
            }
132
            (None, _) => {
133
                return Err(Error::CircProto(
134
                    "Received a SENDME when none was expected".into(),
135
                ));
136
            }
137
        }
138
4
        self.tags.pop_front();
139
4
        Ok(())
140
8
    }
141

            
142
    #[cfg(test)]
143
8
    pub(crate) fn expected_tags(&self) -> Vec<T> {
144
8
        self.tags.iter().map(Clone::clone).collect()
145
8
    }
146
}
147

            
148
impl<P> SendWindow<P>
149
where
150
    P: WindowParams,
151
{
152
    /// Construct a new SendWindow.
153
750
    pub(crate) fn new(window: u16) -> SendWindow<P> {
154
750
        SendWindow {
155
750
            window,
156
750
            _dummy: std::marker::PhantomData,
157
750
        }
158
750
    }
159

            
160
    /// Return true iff the SENDME tag should be recorded.
161
2728
    pub(crate) fn should_record_tag(&self) -> bool {
162
2728
        self.window % P::increment() == 0
163
2728
    }
164

            
165
    /// Remove one item from this window (since we've sent a cell).
166
    /// If the window was empty, returns an error.
167
8260
    pub(crate) fn take(&mut self) -> Result<()> {
168
8260
        self.window = self.window.checked_sub(1).ok_or(Error::CircProto(
169
8260
            "Called SendWindow::take() on empty SendWindow".into(),
170
8260
        ))?;
171
8258
        Ok(())
172
8260
    }
173

            
174
    /// Handle an incoming sendme.
175
    ///
176
    /// On failure, return an error: the caller must close the circuit due to a protocol violation.
177
    #[must_use = "didn't check whether SENDME was expected."]
178
16
    pub(crate) fn put(&mut self) -> Result<()> {
179
        // Overflow check.
180
16
        let new_window = self
181
16
            .window
182
16
            .checked_add(P::increment())
183
16
            .ok_or(Error::from(internal!("Overflow on SENDME window")))?;
184
        // Make sure we never go above our maximum else this wasn't expected.
185
16
        if new_window > P::maximum() {
186
2
            return Err(Error::CircProto("Unexpected stream SENDME".into()));
187
14
        }
188
14
        self.window = new_window;
189
14
        Ok(())
190
16
    }
191

            
192
    /// Return the current send window value.
193
23640
    pub(crate) fn window(&self) -> u16 {
194
23640
        self.window
195
23640
    }
196
}
197

            
198
/// Structure to track when we need to send SENDME cells for incoming data.
199
#[derive(Clone, Debug)]
200
pub(crate) struct RecvWindow<P: WindowParams> {
201
    /// Number of cells that we'd be willing to receive on this window
202
    /// before sending a SENDME.
203
    window: u16,
204
    /// Marker type to tell the compiler that the P type is used.
205
    _dummy: std::marker::PhantomData<P>,
206
}
207

            
208
impl<P: WindowParams> RecvWindow<P> {
209
    /// Create a new RecvWindow.
210
534
    pub(crate) fn new(window: u16) -> RecvWindow<P> {
211
534
        RecvWindow {
212
534
            window,
213
534
            _dummy: std::marker::PhantomData,
214
534
        }
215
534
    }
216

            
217
    /// Called when we've just received a cell; return true if we need to send
218
    /// a sendme, and false otherwise.
219
    ///
220
    /// Returns None if we should not have sent the cell, and we just
221
    /// violated the window.
222
192
    pub(crate) fn take(&mut self) -> Result<bool> {
223
192
        let v = self.window.checked_sub(1);
224
192
        if let Some(x) = v {
225
188
            self.window = x;
226
188
            // TODO: same note as in SendWindow.take(). I don't know if
227
188
            // this truly matches the spec, but tor accepts it.
228
188
            Ok(x % P::increment() == 0)
229
        } else {
230
4
            Err(Error::CircProto(
231
4
                "Received a data cell in violation of a window".into(),
232
4
            ))
233
        }
234
192
    }
235

            
236
    /// Reduce this window by `n`; give an error if this is not possible.
237
48
    pub(crate) fn decrement_n(&mut self, n: u16) -> crate::Result<()> {
238
48
        self.window = self.window.checked_sub(n).ok_or(Error::CircProto(
239
48
            "Received too many cells on a stream".into(),
240
48
        ))?;
241
46
        Ok(())
242
48
    }
243

            
244
    /// Called when we've just sent a SENDME.
245
2
    pub(crate) fn put(&mut self) {
246
2
        self.window = self
247
2
            .window
248
2
            .checked_add(P::increment())
249
2
            .expect("Overflow detected while attempting to increment window");
250
2
    }
251
}
252

            
253
/// Return true if this message type is counted by flow-control windows.
254
19664
pub(crate) fn cmd_counts_towards_windows(cmd: RelayCmd) -> bool {
255
19664
    cmd == RelayCmd::DATA
256
19664
}
257

            
258
/// Return true if this message is counted by flow-control windows.
259
#[cfg(test)]
260
4
pub(crate) fn msg_counts_towards_windows(msg: &tor_cell::relaycell::msg::AnyRelayMsg) -> bool {
261
    use tor_cell::relaycell::RelayMsg;
262
4
    cmd_counts_towards_windows(msg.cmd())
263
4
}
264

            
265
/// Return true if this message is counted by flow-control windows.
266
92
pub(crate) fn cell_counts_towards_windows(cell: &UnparsedRelayMsg) -> bool {
267
92
    cmd_counts_towards_windows(cell.cmd())
268
92
}
269

            
270
#[cfg(test)]
271
mod test {
272
    // @@ begin test lint list maintained by maint/add_warning @@
273
    #![allow(clippy::bool_assert_comparison)]
274
    #![allow(clippy::clone_on_copy)]
275
    #![allow(clippy::dbg_macro)]
276
    #![allow(clippy::mixed_attributes_style)]
277
    #![allow(clippy::print_stderr)]
278
    #![allow(clippy::print_stdout)]
279
    #![allow(clippy::single_char_pattern)]
280
    #![allow(clippy::unwrap_used)]
281
    #![allow(clippy::unchecked_duration_subtraction)]
282
    #![allow(clippy::useless_vec)]
283
    #![allow(clippy::needless_pass_by_value)]
284
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
285
    use super::*;
286
    use tor_basic_utils::test_rng::testing_rng;
287
    use tor_cell::relaycell::{msg, AnyRelayMsgOuter, RelayCellFormat, StreamId};
288

            
289
    #[test]
290
    fn what_counts() {
291
        let mut rng = testing_rng();
292
        let fmt = RelayCellFormat::V0;
293
        let m = msg::Begin::new("www.torproject.org", 443, 0)
294
            .unwrap()
295
            .into();
296
        assert!(!msg_counts_towards_windows(&m));
297
        assert!(!cell_counts_towards_windows(
298
            &UnparsedRelayMsg::from_singleton_body(
299
                RelayCellFormat::V0,
300
                AnyRelayMsgOuter::new(StreamId::new(77), m)
301
                    .encode(fmt, &mut rng)
302
                    .unwrap()
303
            )
304
            .unwrap()
305
        ));
306

            
307
        let m = msg::Data::new(&b"Education is not a prerequisite to political control-political control is the cause of popular education."[..]).unwrap().into(); // Du Bois
308
        assert!(msg_counts_towards_windows(&m));
309
        assert!(cell_counts_towards_windows(
310
            &UnparsedRelayMsg::from_singleton_body(
311
                RelayCellFormat::V0,
312
                AnyRelayMsgOuter::new(StreamId::new(128), m)
313
                    .encode(fmt, &mut rng)
314
                    .unwrap()
315
            )
316
            .unwrap()
317
        ));
318
    }
319

            
320
    #[test]
321
    fn recvwindow() {
322
        let mut w: RecvWindow<StreamParams> = RecvWindow::new(500);
323

            
324
        for _ in 0..49 {
325
            assert!(!w.take().unwrap());
326
        }
327
        assert!(w.take().unwrap());
328
        assert_eq!(w.window, 450);
329

            
330
        assert!(w.decrement_n(123).is_ok());
331
        assert_eq!(w.window, 327);
332

            
333
        w.put();
334
        assert_eq!(w.window, 377);
335

            
336
        // failing decrement.
337
        assert!(w.decrement_n(400).is_err());
338
        // failing take.
339
        assert!(w.decrement_n(377).is_ok());
340
        assert!(w.take().is_err());
341
    }
342

            
343
    fn new_sendwindow() -> SendWindow<CircParams> {
344
        SendWindow::new(1000)
345
    }
346

            
347
    #[test]
348
    fn sendwindow_basic() -> Result<()> {
349
        let mut w = new_sendwindow();
350

            
351
        w.take()?;
352
        assert_eq!(w.window(), 999);
353
        for _ in 0_usize..98 {
354
            w.take()?;
355
        }
356
        assert_eq!(w.window(), 901);
357

            
358
        w.take()?;
359
        assert_eq!(w.window(), 900);
360

            
361
        w.take()?;
362
        assert_eq!(w.window(), 899);
363

            
364
        // Try putting a good tag.
365
        w.put()?;
366
        assert_eq!(w.window(), 999);
367

            
368
        for _ in 0_usize..300 {
369
            w.take()?;
370
        }
371

            
372
        // Put without a tag.
373
        w.put()?;
374
        assert_eq!(w.window(), 799);
375

            
376
        Ok(())
377
    }
378

            
379
    #[test]
380
    fn sendwindow_erroring() -> Result<()> {
381
        let mut w = new_sendwindow();
382
        for _ in 0_usize..1000 {
383
            w.take()?;
384
        }
385
        assert_eq!(w.window(), 0);
386

            
387
        let ready = w.take();
388
        assert!(ready.is_err());
389
        Ok(())
390
    }
391
}