1//! Type and code for handling a "half-closed" stream.
2//!
3//! A half-closed stream is one that we've sent an END on, but where
4//! we might still receive some cells.
56use crate::congestion::sendme::{cmd_counts_towards_windows, StreamRecvWindow};
7use crate::stream::{AnyCmdChecker, StreamSendFlowControl, StreamStatus};
8use crate::{Error, Result};
9use tor_cell::relaycell::{RelayCmd, UnparsedRelayMsg};
1011/// Type to track state of half-closed streams.
12///
13/// A half-closed stream is one where we've sent an END cell, but where
14/// the other side might still send us data.
15///
16/// We need to track these streams instead of forgetting about them entirely,
17/// since otherwise we'd be vulnerable to a class of "DropMark" attacks;
18/// see <https://gitlab.torproject.org/tpo/core/tor/-/issues/25573>.
19#[derive(Debug)]
20pub(super) struct HalfStream {
21/// Send flow control for this stream. Used to detect whether we get too
22 /// many SENDME cells.
23send_flow_control: StreamSendFlowControl,
24/// Receive window for this stream. Used to detect whether we get too
25 /// many data cells.
26recvw: StreamRecvWindow,
27/// Object to tell us which cells to accept on this stream.
28cmd_checker: AnyCmdChecker,
29}
3031impl HalfStream {
32/// Create a new half-closed stream.
33pub(super) fn new(
34 send_flow_control: StreamSendFlowControl,
35 recvw: StreamRecvWindow,
36 cmd_checker: AnyCmdChecker,
37 ) -> Self {
38 HalfStream {
39 send_flow_control,
40 recvw,
41 cmd_checker,
42 }
43 }
4445/// Process an incoming message and adjust this HalfStream accordingly.
46 /// Give an error if the protocol has been violated.
47 ///
48 /// The caller must handle END cells; it is an internal error to pass
49 /// END cells to this method.
50 /// no ends here.
51pub(super) fn handle_msg(&mut self, msg: UnparsedRelayMsg) -> Result<StreamStatus> {
52use tor_cell::relaycell::msg::Sendme;
53use StreamStatus::*;
54if msg.cmd() == RelayCmd::SENDME {
55// We handle SENDME separately, and don't give it to the checker.
56let _ = msg
57 .decode::<Sendme>()
58 .map_err(|e| Error::from_bytes_err(e, "SENDME on half-closed stream"))?;
59self.send_flow_control.put_for_incoming_sendme()?;
60return Ok(Open);
61 }
6263if cmd_counts_towards_windows(msg.cmd()) {
64self.recvw.take()?;
65 }
6667let status = self.cmd_checker.check_msg(&msg)?;
68self.cmd_checker.consume_checked_msg(msg)?;
69Ok(status)
70 }
71}
7273#[cfg(test)]
74mod test {
75// @@ begin test lint list maintained by maint/add_warning @@
76#![allow(clippy::bool_assert_comparison)]
77 #![allow(clippy::clone_on_copy)]
78 #![allow(clippy::dbg_macro)]
79 #![allow(clippy::mixed_attributes_style)]
80 #![allow(clippy::print_stderr)]
81 #![allow(clippy::print_stdout)]
82 #![allow(clippy::single_char_pattern)]
83 #![allow(clippy::unwrap_used)]
84 #![allow(clippy::unchecked_duration_subtraction)]
85 #![allow(clippy::useless_vec)]
86 #![allow(clippy::needless_pass_by_value)]
87//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
88use super::*;
89use crate::{
90 congestion::sendme::{StreamRecvWindow, StreamSendWindow},
91 stream::DataCmdChecker,
92 };
93use rand::{CryptoRng, Rng};
94use tor_basic_utils::test_rng::testing_rng;
95use tor_cell::relaycell::{
96 msg::{self, AnyRelayMsg},
97 AnyRelayMsgOuter, RelayCellFormat, StreamId,
98 };
99100fn to_unparsed<R: Rng + CryptoRng>(rng: &mut R, val: AnyRelayMsg) -> UnparsedRelayMsg {
101 UnparsedRelayMsg::from_singleton_body(
102 RelayCellFormat::V0,
103 AnyRelayMsgOuter::new(StreamId::new(77), val)
104 .encode(RelayCellFormat::V0, rng)
105 .expect("encoding failed"),
106 )
107 .unwrap()
108 }
109110#[test]
111fn halfstream_sendme() {
112let mut rng = testing_rng();
113114// Stream level SENDMEs are not authenticated and so the only way to make sure we were not
115 // expecting one is if the window busts its maximum.
116 //
117 // Starting the window at 450, the first SENDME will increment it to 500 (the maximum)
118 // meaning that the second SENDME will bust that and we'll noticed that it was unexpected.
119let sendw = StreamSendWindow::new(450);
120121let mut hs = HalfStream::new(
122 StreamSendFlowControl::new_window_based(sendw),
123 StreamRecvWindow::new(20),
124 DataCmdChecker::new_any(),
125 );
126127// one sendme is fine
128let m = msg::Sendme::new_empty();
129assert!(hs
130 .handle_msg(to_unparsed(&mut rng, m.clone().into()))
131 .is_ok());
132// but no more were expected!
133let e = hs
134 .handle_msg(to_unparsed(&mut rng, m.into()))
135 .err()
136 .unwrap();
137assert_eq!(
138format!("{}", e),
139"Circuit protocol violation: Unexpected stream SENDME"
140);
141 }
142143fn hs_new() -> HalfStream {
144 HalfStream::new(
145 StreamSendFlowControl::new_window_based(StreamSendWindow::new(20)),
146 StreamRecvWindow::new(20),
147 DataCmdChecker::new_any(),
148 )
149 }
150151#[test]
152fn halfstream_data() {
153let mut hs = hs_new();
154let mut rng = testing_rng();
155156// we didn't give a connected cell during setup, so do it now.
157hs.handle_msg(to_unparsed(&mut rng, msg::Connected::new_empty().into()))
158 .unwrap();
159160// 20 data cells are okay.
161let m = msg::Data::new(&b"this offer is unrepeatable"[..]).unwrap();
162for _ in 0_u8..20 {
163assert!(hs
164 .handle_msg(to_unparsed(&mut rng, m.clone().into()))
165 .is_ok());
166 }
167168// But one more is a protocol violation.
169let e = hs
170 .handle_msg(to_unparsed(&mut rng, m.into()))
171 .err()
172 .unwrap();
173assert_eq!(
174format!("{}", e),
175"Circuit protocol violation: Received a data cell in violation of a window"
176);
177 }
178179#[test]
180fn halfstream_connected() {
181let mut hs = hs_new();
182let mut rng = testing_rng();
183// We were told to accept a connected, so we'll accept one
184 // and no more.
185let m = msg::Connected::new_empty();
186assert!(hs
187 .handle_msg(to_unparsed(&mut rng, m.clone().into()))
188 .is_ok());
189assert!(hs
190 .handle_msg(to_unparsed(&mut rng, m.clone().into()))
191 .is_err());
192193// If we try that again _after getting a connected_,
194 // accept any.
195let mut cmd_checker = DataCmdChecker::new_any();
196 {
197 cmd_checker
198 .check_msg(&to_unparsed(&mut rng, msg::Connected::new_empty().into()))
199 .unwrap();
200 }
201let mut hs = HalfStream::new(
202 StreamSendFlowControl::new_window_based(StreamSendWindow::new(20)),
203 StreamRecvWindow::new(20),
204 cmd_checker,
205 );
206let e = hs
207 .handle_msg(to_unparsed(&mut rng, m.into()))
208 .err()
209 .unwrap();
210assert_eq!(
211format!("{}", e),
212"Stream protocol violation: Received CONNECTED twice on a stream."
213);
214 }
215216#[test]
217fn halfstream_other() {
218let mut hs = hs_new();
219let mut rng = testing_rng();
220let m = msg::Extended2::new(Vec::new());
221let e = hs
222 .handle_msg(to_unparsed(&mut rng, m.into()))
223 .err()
224 .unwrap();
225assert_eq!(
226format!("{}", e),
227"Stream protocol violation: Unexpected EXTENDED2 on a data stream!"
228);
229 }
230}