1
//! Type and code for handling a "half-closed" stream.
2
//!
3
//! A half-closed stream is one that we've sent an END on, but where
4
//! we might still receive some cells.
5

            
6
use crate::congestion::sendme::{cmd_counts_towards_windows, StreamRecvWindow};
7
use crate::stream::{AnyCmdChecker, StreamFlowControl, StreamStatus};
8
use crate::Result;
9
use tor_cell::relaycell::{RelayCmd, UnparsedRelayMsg};
10

            
11
/// Type to track state of half-closed streams.
12
///
13
/// A half-closed stream is one where we've sent an END cell, but where
14
/// the other side might still send us data.
15
///
16
/// We need to track these streams instead of forgetting about them entirely,
17
/// since otherwise we'd be vulnerable to a class of "DropMark" attacks;
18
/// see <https://gitlab.torproject.org/tpo/core/tor/-/issues/25573>.
19
#[derive(Debug)]
20
pub(super) struct HalfStream {
21
    /// Flow control for this stream.
22
    ///
23
    /// Used to process incoming flow control messages (SENDME, XON, etc).
24
    flow_control: StreamFlowControl,
25
    /// Receive window for this stream. Used to detect whether we get too
26
    /// many data cells.
27
    recvw: StreamRecvWindow,
28
    /// Object to tell us which cells to accept on this stream.
29
    cmd_checker: AnyCmdChecker,
30
}
31

            
32
impl HalfStream {
33
    /// Create a new half-closed stream.
34
48
    pub(super) fn new(
35
48
        flow_control: StreamFlowControl,
36
48
        recvw: StreamRecvWindow,
37
48
        cmd_checker: AnyCmdChecker,
38
48
    ) -> Self {
39
48
        HalfStream {
40
48
            flow_control,
41
48
            recvw,
42
48
            cmd_checker,
43
48
        }
44
48
    }
45

            
46
    /// Process an incoming message and adjust this HalfStream accordingly.
47
    /// Give an error if the protocol has been violated.
48
    ///
49
    /// The caller must handle END cells; it is an internal error to pass
50
    /// END cells to this method.
51
    /// no ends here.
52
56
    pub(super) fn handle_msg(&mut self, msg: UnparsedRelayMsg) -> Result<StreamStatus> {
53
        use StreamStatus::*;
54

            
55
        // We handle SENDME/XON/XOFF separately, and don't give it to the checker.
56
        //
57
        // TODO: this logic is the same as `CircHop::deliver_msg_to_stream`; we should refactor this
58
        // if possible
59
56
        match msg.cmd() {
60
            RelayCmd::SENDME => {
61
4
                self.flow_control.put_for_incoming_sendme(msg)?;
62
2
                return Ok(Open);
63
            }
64
            RelayCmd::XON => {
65
                self.flow_control.handle_incoming_xon(msg)?;
66
                return Ok(Open);
67
            }
68
            RelayCmd::XOFF => {
69
                self.flow_control.handle_incoming_xoff(msg)?;
70
                return Ok(Open);
71
            }
72
52
            _ => {}
73
52
        }
74
52

            
75
52
        if cmd_counts_towards_windows(msg.cmd()) {
76
42
            self.recvw.take()?;
77
10
        }
78

            
79
50
        let status = self.cmd_checker.check_msg(&msg)?;
80
44
        self.cmd_checker.consume_checked_msg(msg)?;
81
44
        Ok(status)
82
56
    }
83
}
84

            
85
#[cfg(test)]
86
mod test {
87
    // @@ begin test lint list maintained by maint/add_warning @@
88
    #![allow(clippy::bool_assert_comparison)]
89
    #![allow(clippy::clone_on_copy)]
90
    #![allow(clippy::dbg_macro)]
91
    #![allow(clippy::mixed_attributes_style)]
92
    #![allow(clippy::print_stderr)]
93
    #![allow(clippy::print_stdout)]
94
    #![allow(clippy::single_char_pattern)]
95
    #![allow(clippy::unwrap_used)]
96
    #![allow(clippy::unchecked_duration_subtraction)]
97
    #![allow(clippy::useless_vec)]
98
    #![allow(clippy::needless_pass_by_value)]
99
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
100
    use super::*;
101
    use crate::{
102
        congestion::sendme::{StreamRecvWindow, StreamSendWindow},
103
        stream::DataCmdChecker,
104
    };
105
    use rand::{CryptoRng, Rng};
106
    use tor_basic_utils::test_rng::testing_rng;
107
    use tor_cell::relaycell::{
108
        msg::{self, AnyRelayMsg},
109
        AnyRelayMsgOuter, RelayCellFormat, StreamId,
110
    };
111

            
112
    fn to_unparsed<R: Rng + CryptoRng>(rng: &mut R, val: AnyRelayMsg) -> UnparsedRelayMsg {
113
        UnparsedRelayMsg::from_singleton_body(
114
            RelayCellFormat::V0,
115
            AnyRelayMsgOuter::new(StreamId::new(77), val)
116
                .encode(RelayCellFormat::V0, rng)
117
                .expect("encoding failed"),
118
        )
119
        .unwrap()
120
    }
121

            
122
    #[test]
123
    fn halfstream_sendme() {
124
        let mut rng = testing_rng();
125

            
126
        // Stream level SENDMEs are not authenticated and so the only way to make sure we were not
127
        // expecting one is if the window busts its maximum.
128
        //
129
        // Starting the window at 450, the first SENDME will increment it to 500 (the maximum)
130
        // meaning that the second SENDME will bust that and we'll noticed that it was unexpected.
131
        let sendw = StreamSendWindow::new(450);
132

            
133
        let mut hs = HalfStream::new(
134
            StreamFlowControl::new_window_based(sendw),
135
            StreamRecvWindow::new(20),
136
            DataCmdChecker::new_any(),
137
        );
138

            
139
        // one sendme is fine
140
        let m = msg::Sendme::new_empty();
141
        assert!(hs
142
            .handle_msg(to_unparsed(&mut rng, m.clone().into()))
143
            .is_ok());
144
        // but no more were expected!
145
        let e = hs
146
            .handle_msg(to_unparsed(&mut rng, m.into()))
147
            .err()
148
            .unwrap();
149
        assert_eq!(
150
            format!("{}", e),
151
            "Circuit protocol violation: Unexpected stream SENDME"
152
        );
153
    }
154

            
155
    fn hs_new() -> HalfStream {
156
        HalfStream::new(
157
            StreamFlowControl::new_window_based(StreamSendWindow::new(20)),
158
            StreamRecvWindow::new(20),
159
            DataCmdChecker::new_any(),
160
        )
161
    }
162

            
163
    #[test]
164
    fn halfstream_data() {
165
        let mut hs = hs_new();
166
        let mut rng = testing_rng();
167

            
168
        // we didn't give a connected cell during setup, so do it now.
169
        hs.handle_msg(to_unparsed(&mut rng, msg::Connected::new_empty().into()))
170
            .unwrap();
171

            
172
        // 20 data cells are okay.
173
        let m = msg::Data::new(&b"this offer is unrepeatable"[..]).unwrap();
174
        for _ in 0_u8..20 {
175
            assert!(hs
176
                .handle_msg(to_unparsed(&mut rng, m.clone().into()))
177
                .is_ok());
178
        }
179

            
180
        // But one more is a protocol violation.
181
        let e = hs
182
            .handle_msg(to_unparsed(&mut rng, m.into()))
183
            .err()
184
            .unwrap();
185
        assert_eq!(
186
            format!("{}", e),
187
            "Circuit protocol violation: Received a data cell in violation of a window"
188
        );
189
    }
190

            
191
    #[test]
192
    fn halfstream_connected() {
193
        let mut hs = hs_new();
194
        let mut rng = testing_rng();
195
        // We were told to accept a connected, so we'll accept one
196
        // and no more.
197
        let m = msg::Connected::new_empty();
198
        assert!(hs
199
            .handle_msg(to_unparsed(&mut rng, m.clone().into()))
200
            .is_ok());
201
        assert!(hs
202
            .handle_msg(to_unparsed(&mut rng, m.clone().into()))
203
            .is_err());
204

            
205
        // If we try that again _after getting a connected_,
206
        // accept any.
207
        let mut cmd_checker = DataCmdChecker::new_any();
208
        {
209
            cmd_checker
210
                .check_msg(&to_unparsed(&mut rng, msg::Connected::new_empty().into()))
211
                .unwrap();
212
        }
213
        let mut hs = HalfStream::new(
214
            StreamFlowControl::new_window_based(StreamSendWindow::new(20)),
215
            StreamRecvWindow::new(20),
216
            cmd_checker,
217
        );
218
        let e = hs
219
            .handle_msg(to_unparsed(&mut rng, m.into()))
220
            .err()
221
            .unwrap();
222
        assert_eq!(
223
            format!("{}", e),
224
            "Stream protocol violation: Received CONNECTED twice on a stream."
225
        );
226
    }
227

            
228
    #[test]
229
    fn halfstream_other() {
230
        let mut hs = hs_new();
231
        let mut rng = testing_rng();
232
        let m = msg::Extended2::new(Vec::new());
233
        let e = hs
234
            .handle_msg(to_unparsed(&mut rng, m.into()))
235
            .err()
236
            .unwrap();
237
        assert_eq!(
238
            format!("{}", e),
239
            "Stream protocol violation: Unexpected EXTENDED2 on a data stream!"
240
        );
241
    }
242
}