tor_proto/tunnel/
halfstream.rs

1//! Type and code for handling a "half-closed" stream.
2//!
3//! A half-closed stream is one that we've sent an END on, but where
4//! we might still receive some cells.
5
6use crate::congestion::sendme::{cmd_counts_towards_windows, StreamRecvWindow};
7use crate::stream::{AnyCmdChecker, StreamSendFlowControl, StreamStatus};
8use crate::{Error, Result};
9use tor_cell::relaycell::{RelayCmd, UnparsedRelayMsg};
10
11/// Type to track state of half-closed streams.
12///
13/// A half-closed stream is one where we've sent an END cell, but where
14/// the other side might still send us data.
15///
16/// We need to track these streams instead of forgetting about them entirely,
17/// since otherwise we'd be vulnerable to a class of "DropMark" attacks;
18/// see <https://gitlab.torproject.org/tpo/core/tor/-/issues/25573>.
19#[derive(Debug)]
20pub(super) struct HalfStream {
21    /// Send flow control for this stream. Used to detect whether we get too
22    /// many SENDME cells.
23    send_flow_control: StreamSendFlowControl,
24    /// Receive window for this stream. Used to detect whether we get too
25    /// many data cells.
26    recvw: StreamRecvWindow,
27    /// Object to tell us which cells to accept on this stream.
28    cmd_checker: AnyCmdChecker,
29}
30
31impl HalfStream {
32    /// Create a new half-closed stream.
33    pub(super) fn new(
34        send_flow_control: StreamSendFlowControl,
35        recvw: StreamRecvWindow,
36        cmd_checker: AnyCmdChecker,
37    ) -> Self {
38        HalfStream {
39            send_flow_control,
40            recvw,
41            cmd_checker,
42        }
43    }
44
45    /// Process an incoming message and adjust this HalfStream accordingly.
46    /// Give an error if the protocol has been violated.
47    ///
48    /// The caller must handle END cells; it is an internal error to pass
49    /// END cells to this method.
50    /// no ends here.
51    pub(super) fn handle_msg(&mut self, msg: UnparsedRelayMsg) -> Result<StreamStatus> {
52        use tor_cell::relaycell::msg::Sendme;
53        use StreamStatus::*;
54        if msg.cmd() == RelayCmd::SENDME {
55            // We handle SENDME separately, and don't give it to the checker.
56            let _ = msg
57                .decode::<Sendme>()
58                .map_err(|e| Error::from_bytes_err(e, "SENDME on half-closed stream"))?;
59            self.send_flow_control.put_for_incoming_sendme()?;
60            return Ok(Open);
61        }
62
63        if cmd_counts_towards_windows(msg.cmd()) {
64            self.recvw.take()?;
65        }
66
67        let status = self.cmd_checker.check_msg(&msg)?;
68        self.cmd_checker.consume_checked_msg(msg)?;
69        Ok(status)
70    }
71}
72
73#[cfg(test)]
74mod test {
75    // @@ begin test lint list maintained by maint/add_warning @@
76    #![allow(clippy::bool_assert_comparison)]
77    #![allow(clippy::clone_on_copy)]
78    #![allow(clippy::dbg_macro)]
79    #![allow(clippy::mixed_attributes_style)]
80    #![allow(clippy::print_stderr)]
81    #![allow(clippy::print_stdout)]
82    #![allow(clippy::single_char_pattern)]
83    #![allow(clippy::unwrap_used)]
84    #![allow(clippy::unchecked_duration_subtraction)]
85    #![allow(clippy::useless_vec)]
86    #![allow(clippy::needless_pass_by_value)]
87    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
88    use super::*;
89    use crate::{
90        congestion::sendme::{StreamRecvWindow, StreamSendWindow},
91        stream::DataCmdChecker,
92    };
93    use rand::{CryptoRng, Rng};
94    use tor_basic_utils::test_rng::testing_rng;
95    use tor_cell::relaycell::{
96        msg::{self, AnyRelayMsg},
97        AnyRelayMsgOuter, RelayCellFormat, StreamId,
98    };
99
100    fn to_unparsed<R: Rng + CryptoRng>(rng: &mut R, val: AnyRelayMsg) -> UnparsedRelayMsg {
101        UnparsedRelayMsg::from_singleton_body(
102            RelayCellFormat::V0,
103            AnyRelayMsgOuter::new(StreamId::new(77), val)
104                .encode(RelayCellFormat::V0, rng)
105                .expect("encoding failed"),
106        )
107        .unwrap()
108    }
109
110    #[test]
111    fn halfstream_sendme() {
112        let mut rng = testing_rng();
113
114        // Stream level SENDMEs are not authenticated and so the only way to make sure we were not
115        // expecting one is if the window busts its maximum.
116        //
117        // Starting the window at 450, the first SENDME will increment it to 500 (the maximum)
118        // meaning that the second SENDME will bust that and we'll noticed that it was unexpected.
119        let sendw = StreamSendWindow::new(450);
120
121        let mut hs = HalfStream::new(
122            StreamSendFlowControl::new_window_based(sendw),
123            StreamRecvWindow::new(20),
124            DataCmdChecker::new_any(),
125        );
126
127        // one sendme is fine
128        let m = msg::Sendme::new_empty();
129        assert!(hs
130            .handle_msg(to_unparsed(&mut rng, m.clone().into()))
131            .is_ok());
132        // but no more were expected!
133        let e = hs
134            .handle_msg(to_unparsed(&mut rng, m.into()))
135            .err()
136            .unwrap();
137        assert_eq!(
138            format!("{}", e),
139            "Circuit protocol violation: Unexpected stream SENDME"
140        );
141    }
142
143    fn hs_new() -> HalfStream {
144        HalfStream::new(
145            StreamSendFlowControl::new_window_based(StreamSendWindow::new(20)),
146            StreamRecvWindow::new(20),
147            DataCmdChecker::new_any(),
148        )
149    }
150
151    #[test]
152    fn halfstream_data() {
153        let mut hs = hs_new();
154        let mut rng = testing_rng();
155
156        // we didn't give a connected cell during setup, so do it now.
157        hs.handle_msg(to_unparsed(&mut rng, msg::Connected::new_empty().into()))
158            .unwrap();
159
160        // 20 data cells are okay.
161        let m = msg::Data::new(&b"this offer is unrepeatable"[..]).unwrap();
162        for _ in 0_u8..20 {
163            assert!(hs
164                .handle_msg(to_unparsed(&mut rng, m.clone().into()))
165                .is_ok());
166        }
167
168        // But one more is a protocol violation.
169        let e = hs
170            .handle_msg(to_unparsed(&mut rng, m.into()))
171            .err()
172            .unwrap();
173        assert_eq!(
174            format!("{}", e),
175            "Circuit protocol violation: Received a data cell in violation of a window"
176        );
177    }
178
179    #[test]
180    fn halfstream_connected() {
181        let mut hs = hs_new();
182        let mut rng = testing_rng();
183        // We were told to accept a connected, so we'll accept one
184        // and no more.
185        let m = msg::Connected::new_empty();
186        assert!(hs
187            .handle_msg(to_unparsed(&mut rng, m.clone().into()))
188            .is_ok());
189        assert!(hs
190            .handle_msg(to_unparsed(&mut rng, m.clone().into()))
191            .is_err());
192
193        // If we try that again _after getting a connected_,
194        // accept any.
195        let mut cmd_checker = DataCmdChecker::new_any();
196        {
197            cmd_checker
198                .check_msg(&to_unparsed(&mut rng, msg::Connected::new_empty().into()))
199                .unwrap();
200        }
201        let mut hs = HalfStream::new(
202            StreamSendFlowControl::new_window_based(StreamSendWindow::new(20)),
203            StreamRecvWindow::new(20),
204            cmd_checker,
205        );
206        let e = hs
207            .handle_msg(to_unparsed(&mut rng, m.into()))
208            .err()
209            .unwrap();
210        assert_eq!(
211            format!("{}", e),
212            "Stream protocol violation: Received CONNECTED twice on a stream."
213        );
214    }
215
216    #[test]
217    fn halfstream_other() {
218        let mut hs = hs_new();
219        let mut rng = testing_rng();
220        let m = msg::Extended2::new(Vec::new());
221        let e = hs
222            .handle_msg(to_unparsed(&mut rng, m.into()))
223            .err()
224            .unwrap();
225        assert_eq!(
226            format!("{}", e),
227            "Stream protocol violation: Unexpected EXTENDED2 on a data stream!"
228        );
229    }
230}