tor_proto/tunnel/
halfstream.rs

1//! Type and code for handling a "half-closed" stream.
2//!
3//! A half-closed stream is one that we've sent an END on, but where
4//! we might still receive some cells.
5
6use crate::congestion::sendme::{cmd_counts_towards_windows, StreamRecvWindow};
7use crate::stream::{AnyCmdChecker, StreamSendFlowControl, StreamStatus};
8use crate::Result;
9use tor_cell::relaycell::{RelayCmd, UnparsedRelayMsg};
10
11/// Type to track state of half-closed streams.
12///
13/// A half-closed stream is one where we've sent an END cell, but where
14/// the other side might still send us data.
15///
16/// We need to track these streams instead of forgetting about them entirely,
17/// since otherwise we'd be vulnerable to a class of "DropMark" attacks;
18/// see <https://gitlab.torproject.org/tpo/core/tor/-/issues/25573>.
19#[derive(Debug)]
20pub(super) struct HalfStream {
21    /// Send flow control for this stream. Used to detect whether we get too
22    /// many SENDME cells.
23    send_flow_control: StreamSendFlowControl,
24    /// Receive window for this stream. Used to detect whether we get too
25    /// many data cells.
26    recvw: StreamRecvWindow,
27    /// Object to tell us which cells to accept on this stream.
28    cmd_checker: AnyCmdChecker,
29}
30
31impl HalfStream {
32    /// Create a new half-closed stream.
33    pub(super) fn new(
34        send_flow_control: StreamSendFlowControl,
35        recvw: StreamRecvWindow,
36        cmd_checker: AnyCmdChecker,
37    ) -> Self {
38        HalfStream {
39            send_flow_control,
40            recvw,
41            cmd_checker,
42        }
43    }
44
45    /// Process an incoming message and adjust this HalfStream accordingly.
46    /// Give an error if the protocol has been violated.
47    ///
48    /// The caller must handle END cells; it is an internal error to pass
49    /// END cells to this method.
50    /// no ends here.
51    pub(super) fn handle_msg(&mut self, msg: UnparsedRelayMsg) -> Result<StreamStatus> {
52        use StreamStatus::*;
53
54        // We handle SENDME/XON/XOFF separately, and don't give it to the checker.
55        //
56        // TODO: this logic is the same as `CircHop::deliver_msg_to_stream`; we should refactor this
57        // if possible
58        match msg.cmd() {
59            RelayCmd::SENDME => {
60                self.send_flow_control.put_for_incoming_sendme(msg)?;
61                return Ok(Open);
62            }
63            RelayCmd::XON => {
64                self.send_flow_control.handle_incoming_xon(msg)?;
65                return Ok(Open);
66            }
67            RelayCmd::XOFF => {
68                self.send_flow_control.handle_incoming_xoff(msg)?;
69                return Ok(Open);
70            }
71            _ => {}
72        }
73
74        if cmd_counts_towards_windows(msg.cmd()) {
75            self.recvw.take()?;
76        }
77
78        let status = self.cmd_checker.check_msg(&msg)?;
79        self.cmd_checker.consume_checked_msg(msg)?;
80        Ok(status)
81    }
82}
83
84#[cfg(test)]
85mod test {
86    // @@ begin test lint list maintained by maint/add_warning @@
87    #![allow(clippy::bool_assert_comparison)]
88    #![allow(clippy::clone_on_copy)]
89    #![allow(clippy::dbg_macro)]
90    #![allow(clippy::mixed_attributes_style)]
91    #![allow(clippy::print_stderr)]
92    #![allow(clippy::print_stdout)]
93    #![allow(clippy::single_char_pattern)]
94    #![allow(clippy::unwrap_used)]
95    #![allow(clippy::unchecked_duration_subtraction)]
96    #![allow(clippy::useless_vec)]
97    #![allow(clippy::needless_pass_by_value)]
98    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
99    use super::*;
100    use crate::{
101        congestion::sendme::{StreamRecvWindow, StreamSendWindow},
102        stream::DataCmdChecker,
103    };
104    use rand::{CryptoRng, Rng};
105    use tor_basic_utils::test_rng::testing_rng;
106    use tor_cell::relaycell::{
107        msg::{self, AnyRelayMsg},
108        AnyRelayMsgOuter, RelayCellFormat, StreamId,
109    };
110
111    fn to_unparsed<R: Rng + CryptoRng>(rng: &mut R, val: AnyRelayMsg) -> UnparsedRelayMsg {
112        UnparsedRelayMsg::from_singleton_body(
113            RelayCellFormat::V0,
114            AnyRelayMsgOuter::new(StreamId::new(77), val)
115                .encode(RelayCellFormat::V0, rng)
116                .expect("encoding failed"),
117        )
118        .unwrap()
119    }
120
121    #[test]
122    fn halfstream_sendme() {
123        let mut rng = testing_rng();
124
125        // Stream level SENDMEs are not authenticated and so the only way to make sure we were not
126        // expecting one is if the window busts its maximum.
127        //
128        // Starting the window at 450, the first SENDME will increment it to 500 (the maximum)
129        // meaning that the second SENDME will bust that and we'll noticed that it was unexpected.
130        let sendw = StreamSendWindow::new(450);
131
132        let mut hs = HalfStream::new(
133            StreamSendFlowControl::new_window_based(sendw),
134            StreamRecvWindow::new(20),
135            DataCmdChecker::new_any(),
136        );
137
138        // one sendme is fine
139        let m = msg::Sendme::new_empty();
140        assert!(hs
141            .handle_msg(to_unparsed(&mut rng, m.clone().into()))
142            .is_ok());
143        // but no more were expected!
144        let e = hs
145            .handle_msg(to_unparsed(&mut rng, m.into()))
146            .err()
147            .unwrap();
148        assert_eq!(
149            format!("{}", e),
150            "Circuit protocol violation: Unexpected stream SENDME"
151        );
152    }
153
154    fn hs_new() -> HalfStream {
155        HalfStream::new(
156            StreamSendFlowControl::new_window_based(StreamSendWindow::new(20)),
157            StreamRecvWindow::new(20),
158            DataCmdChecker::new_any(),
159        )
160    }
161
162    #[test]
163    fn halfstream_data() {
164        let mut hs = hs_new();
165        let mut rng = testing_rng();
166
167        // we didn't give a connected cell during setup, so do it now.
168        hs.handle_msg(to_unparsed(&mut rng, msg::Connected::new_empty().into()))
169            .unwrap();
170
171        // 20 data cells are okay.
172        let m = msg::Data::new(&b"this offer is unrepeatable"[..]).unwrap();
173        for _ in 0_u8..20 {
174            assert!(hs
175                .handle_msg(to_unparsed(&mut rng, m.clone().into()))
176                .is_ok());
177        }
178
179        // But one more is a protocol violation.
180        let e = hs
181            .handle_msg(to_unparsed(&mut rng, m.into()))
182            .err()
183            .unwrap();
184        assert_eq!(
185            format!("{}", e),
186            "Circuit protocol violation: Received a data cell in violation of a window"
187        );
188    }
189
190    #[test]
191    fn halfstream_connected() {
192        let mut hs = hs_new();
193        let mut rng = testing_rng();
194        // We were told to accept a connected, so we'll accept one
195        // and no more.
196        let m = msg::Connected::new_empty();
197        assert!(hs
198            .handle_msg(to_unparsed(&mut rng, m.clone().into()))
199            .is_ok());
200        assert!(hs
201            .handle_msg(to_unparsed(&mut rng, m.clone().into()))
202            .is_err());
203
204        // If we try that again _after getting a connected_,
205        // accept any.
206        let mut cmd_checker = DataCmdChecker::new_any();
207        {
208            cmd_checker
209                .check_msg(&to_unparsed(&mut rng, msg::Connected::new_empty().into()))
210                .unwrap();
211        }
212        let mut hs = HalfStream::new(
213            StreamSendFlowControl::new_window_based(StreamSendWindow::new(20)),
214            StreamRecvWindow::new(20),
215            cmd_checker,
216        );
217        let e = hs
218            .handle_msg(to_unparsed(&mut rng, m.into()))
219            .err()
220            .unwrap();
221        assert_eq!(
222            format!("{}", e),
223            "Stream protocol violation: Received CONNECTED twice on a stream."
224        );
225    }
226
227    #[test]
228    fn halfstream_other() {
229        let mut hs = hs_new();
230        let mut rng = testing_rng();
231        let m = msg::Extended2::new(Vec::new());
232        let e = hs
233            .handle_msg(to_unparsed(&mut rng, m.into()))
234            .err()
235            .unwrap();
236        assert_eq!(
237            format!("{}", e),
238            "Stream protocol violation: Unexpected EXTENDED2 on a data stream!"
239        );
240    }
241}