1
//! Type and code for handling a "half-closed" stream.
2
//!
3
//! A half-closed stream is one that we've sent an END on, but where
4
//! we might still receive some cells.
5

            
6
use crate::congestion::sendme::{cmd_counts_towards_windows, StreamRecvWindow};
7
use crate::stream::{AnyCmdChecker, StreamSendFlowControl, StreamStatus};
8
use crate::{Error, Result};
9
use tor_cell::relaycell::{RelayCmd, UnparsedRelayMsg};
10

            
11
/// Type to track state of half-closed streams.
12
///
13
/// A half-closed stream is one where we've sent an END cell, but where
14
/// the other side might still send us data.
15
///
16
/// We need to track these streams instead of forgetting about them entirely,
17
/// since otherwise we'd be vulnerable to a class of "DropMark" attacks;
18
/// see <https://gitlab.torproject.org/tpo/core/tor/-/issues/25573>.
19
#[derive(Debug)]
20
pub(super) struct HalfStream {
21
    /// Send flow control for this stream. Used to detect whether we get too
22
    /// many SENDME cells.
23
    send_flow_control: StreamSendFlowControl,
24
    /// Receive window for this stream. Used to detect whether we get too
25
    /// many data cells.
26
    recvw: StreamRecvWindow,
27
    /// Object to tell us which cells to accept on this stream.
28
    cmd_checker: AnyCmdChecker,
29
}
30

            
31
impl HalfStream {
32
    /// Create a new half-closed stream.
33
52
    pub(super) fn new(
34
52
        send_flow_control: StreamSendFlowControl,
35
52
        recvw: StreamRecvWindow,
36
52
        cmd_checker: AnyCmdChecker,
37
52
    ) -> Self {
38
52
        HalfStream {
39
52
            send_flow_control,
40
52
            recvw,
41
52
            cmd_checker,
42
52
        }
43
52
    }
44

            
45
    /// Process an incoming message and adjust this HalfStream accordingly.
46
    /// Give an error if the protocol has been violated.
47
    ///
48
    /// The caller must handle END cells; it is an internal error to pass
49
    /// END cells to this method.
50
    /// no ends here.
51
56
    pub(super) fn handle_msg(&mut self, msg: UnparsedRelayMsg) -> Result<StreamStatus> {
52
        use tor_cell::relaycell::msg::Sendme;
53
        use StreamStatus::*;
54
56
        if msg.cmd() == RelayCmd::SENDME {
55
            // We handle SENDME separately, and don't give it to the checker.
56
4
            let _ = msg
57
4
                .decode::<Sendme>()
58
4
                .map_err(|e| Error::from_bytes_err(e, "SENDME on half-closed stream"))?;
59
4
            self.send_flow_control.put_for_incoming_sendme()?;
60
2
            return Ok(Open);
61
52
        }
62
52

            
63
52
        if cmd_counts_towards_windows(msg.cmd()) {
64
42
            self.recvw.take()?;
65
10
        }
66

            
67
50
        let status = self.cmd_checker.check_msg(&msg)?;
68
44
        self.cmd_checker.consume_checked_msg(msg)?;
69
44
        Ok(status)
70
56
    }
71
}
72

            
73
#[cfg(test)]
74
mod test {
75
    // @@ begin test lint list maintained by maint/add_warning @@
76
    #![allow(clippy::bool_assert_comparison)]
77
    #![allow(clippy::clone_on_copy)]
78
    #![allow(clippy::dbg_macro)]
79
    #![allow(clippy::mixed_attributes_style)]
80
    #![allow(clippy::print_stderr)]
81
    #![allow(clippy::print_stdout)]
82
    #![allow(clippy::single_char_pattern)]
83
    #![allow(clippy::unwrap_used)]
84
    #![allow(clippy::unchecked_duration_subtraction)]
85
    #![allow(clippy::useless_vec)]
86
    #![allow(clippy::needless_pass_by_value)]
87
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
88
    use super::*;
89
    use crate::{
90
        congestion::sendme::{StreamRecvWindow, StreamSendWindow},
91
        stream::DataCmdChecker,
92
    };
93
    use rand::{CryptoRng, Rng};
94
    use tor_basic_utils::test_rng::testing_rng;
95
    use tor_cell::relaycell::{
96
        msg::{self, AnyRelayMsg},
97
        AnyRelayMsgOuter, RelayCellFormat, StreamId,
98
    };
99

            
100
    fn to_unparsed<R: Rng + CryptoRng>(rng: &mut R, val: AnyRelayMsg) -> UnparsedRelayMsg {
101
        UnparsedRelayMsg::from_singleton_body(
102
            RelayCellFormat::V0,
103
            AnyRelayMsgOuter::new(StreamId::new(77), val)
104
                .encode(RelayCellFormat::V0, rng)
105
                .expect("encoding failed"),
106
        )
107
        .unwrap()
108
    }
109

            
110
    #[test]
111
    fn halfstream_sendme() {
112
        let mut rng = testing_rng();
113

            
114
        // Stream level SENDMEs are not authenticated and so the only way to make sure we were not
115
        // expecting one is if the window busts its maximum.
116
        //
117
        // Starting the window at 450, the first SENDME will increment it to 500 (the maximum)
118
        // meaning that the second SENDME will bust that and we'll noticed that it was unexpected.
119
        let sendw = StreamSendWindow::new(450);
120

            
121
        let mut hs = HalfStream::new(
122
            StreamSendFlowControl::new_window_based(sendw),
123
            StreamRecvWindow::new(20),
124
            DataCmdChecker::new_any(),
125
        );
126

            
127
        // one sendme is fine
128
        let m = msg::Sendme::new_empty();
129
        assert!(hs
130
            .handle_msg(to_unparsed(&mut rng, m.clone().into()))
131
            .is_ok());
132
        // but no more were expected!
133
        let e = hs
134
            .handle_msg(to_unparsed(&mut rng, m.into()))
135
            .err()
136
            .unwrap();
137
        assert_eq!(
138
            format!("{}", e),
139
            "Circuit protocol violation: Unexpected stream SENDME"
140
        );
141
    }
142

            
143
    fn hs_new() -> HalfStream {
144
        HalfStream::new(
145
            StreamSendFlowControl::new_window_based(StreamSendWindow::new(20)),
146
            StreamRecvWindow::new(20),
147
            DataCmdChecker::new_any(),
148
        )
149
    }
150

            
151
    #[test]
152
    fn halfstream_data() {
153
        let mut hs = hs_new();
154
        let mut rng = testing_rng();
155

            
156
        // we didn't give a connected cell during setup, so do it now.
157
        hs.handle_msg(to_unparsed(&mut rng, msg::Connected::new_empty().into()))
158
            .unwrap();
159

            
160
        // 20 data cells are okay.
161
        let m = msg::Data::new(&b"this offer is unrepeatable"[..]).unwrap();
162
        for _ in 0_u8..20 {
163
            assert!(hs
164
                .handle_msg(to_unparsed(&mut rng, m.clone().into()))
165
                .is_ok());
166
        }
167

            
168
        // But one more is a protocol violation.
169
        let e = hs
170
            .handle_msg(to_unparsed(&mut rng, m.into()))
171
            .err()
172
            .unwrap();
173
        assert_eq!(
174
            format!("{}", e),
175
            "Circuit protocol violation: Received a data cell in violation of a window"
176
        );
177
    }
178

            
179
    #[test]
180
    fn halfstream_connected() {
181
        let mut hs = hs_new();
182
        let mut rng = testing_rng();
183
        // We were told to accept a connected, so we'll accept one
184
        // and no more.
185
        let m = msg::Connected::new_empty();
186
        assert!(hs
187
            .handle_msg(to_unparsed(&mut rng, m.clone().into()))
188
            .is_ok());
189
        assert!(hs
190
            .handle_msg(to_unparsed(&mut rng, m.clone().into()))
191
            .is_err());
192

            
193
        // If we try that again _after getting a connected_,
194
        // accept any.
195
        let mut cmd_checker = DataCmdChecker::new_any();
196
        {
197
            cmd_checker
198
                .check_msg(&to_unparsed(&mut rng, msg::Connected::new_empty().into()))
199
                .unwrap();
200
        }
201
        let mut hs = HalfStream::new(
202
            StreamSendFlowControl::new_window_based(StreamSendWindow::new(20)),
203
            StreamRecvWindow::new(20),
204
            cmd_checker,
205
        );
206
        let e = hs
207
            .handle_msg(to_unparsed(&mut rng, m.into()))
208
            .err()
209
            .unwrap();
210
        assert_eq!(
211
            format!("{}", e),
212
            "Stream protocol violation: Received CONNECTED twice on a stream."
213
        );
214
    }
215

            
216
    #[test]
217
    fn halfstream_other() {
218
        let mut hs = hs_new();
219
        let mut rng = testing_rng();
220
        let m = msg::Extended2::new(Vec::new());
221
        let e = hs
222
            .handle_msg(to_unparsed(&mut rng, m.into()))
223
            .err()
224
            .unwrap();
225
        assert_eq!(
226
            format!("{}", e),
227
            "Stream protocol violation: Unexpected EXTENDED2 on a data stream!"
228
        );
229
    }
230
}