1//! Type and code for handling a "half-closed" stream.
2//!
3//! A half-closed stream is one that we've sent an END on, but where
4//! we might still receive some cells.
56use crate::congestion::sendme::{cmd_counts_towards_windows, StreamRecvWindow};
7use crate::stream::{AnyCmdChecker, StreamSendFlowControl, StreamStatus};
8use crate::Result;
9use tor_cell::relaycell::{RelayCmd, UnparsedRelayMsg};
1011/// Type to track state of half-closed streams.
12///
13/// A half-closed stream is one where we've sent an END cell, but where
14/// the other side might still send us data.
15///
16/// We need to track these streams instead of forgetting about them entirely,
17/// since otherwise we'd be vulnerable to a class of "DropMark" attacks;
18/// see <https://gitlab.torproject.org/tpo/core/tor/-/issues/25573>.
19#[derive(Debug)]
20pub(super) struct HalfStream {
21/// Send flow control for this stream. Used to detect whether we get too
22 /// many SENDME cells.
23send_flow_control: StreamSendFlowControl,
24/// Receive window for this stream. Used to detect whether we get too
25 /// many data cells.
26recvw: StreamRecvWindow,
27/// Object to tell us which cells to accept on this stream.
28cmd_checker: AnyCmdChecker,
29}
3031impl HalfStream {
32/// Create a new half-closed stream.
33pub(super) fn new(
34 send_flow_control: StreamSendFlowControl,
35 recvw: StreamRecvWindow,
36 cmd_checker: AnyCmdChecker,
37 ) -> Self {
38 HalfStream {
39 send_flow_control,
40 recvw,
41 cmd_checker,
42 }
43 }
4445/// Process an incoming message and adjust this HalfStream accordingly.
46 /// Give an error if the protocol has been violated.
47 ///
48 /// The caller must handle END cells; it is an internal error to pass
49 /// END cells to this method.
50 /// no ends here.
51pub(super) fn handle_msg(&mut self, msg: UnparsedRelayMsg) -> Result<StreamStatus> {
52use StreamStatus::*;
5354// We handle SENDME/XON/XOFF separately, and don't give it to the checker.
55 //
56 // TODO: this logic is the same as `CircHop::deliver_msg_to_stream`; we should refactor this
57 // if possible
58match msg.cmd() {
59 RelayCmd::SENDME => {
60self.send_flow_control.put_for_incoming_sendme(msg)?;
61return Ok(Open);
62 }
63 RelayCmd::XON => {
64self.send_flow_control.handle_incoming_xon(msg)?;
65return Ok(Open);
66 }
67 RelayCmd::XOFF => {
68self.send_flow_control.handle_incoming_xoff(msg)?;
69return Ok(Open);
70 }
71_ => {}
72 }
7374if cmd_counts_towards_windows(msg.cmd()) {
75self.recvw.take()?;
76 }
7778let status = self.cmd_checker.check_msg(&msg)?;
79self.cmd_checker.consume_checked_msg(msg)?;
80Ok(status)
81 }
82}
8384#[cfg(test)]
85mod test {
86// @@ begin test lint list maintained by maint/add_warning @@
87#![allow(clippy::bool_assert_comparison)]
88 #![allow(clippy::clone_on_copy)]
89 #![allow(clippy::dbg_macro)]
90 #![allow(clippy::mixed_attributes_style)]
91 #![allow(clippy::print_stderr)]
92 #![allow(clippy::print_stdout)]
93 #![allow(clippy::single_char_pattern)]
94 #![allow(clippy::unwrap_used)]
95 #![allow(clippy::unchecked_duration_subtraction)]
96 #![allow(clippy::useless_vec)]
97 #![allow(clippy::needless_pass_by_value)]
98//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
99use super::*;
100use crate::{
101 congestion::sendme::{StreamRecvWindow, StreamSendWindow},
102 stream::DataCmdChecker,
103 };
104use rand::{CryptoRng, Rng};
105use tor_basic_utils::test_rng::testing_rng;
106use tor_cell::relaycell::{
107 msg::{self, AnyRelayMsg},
108 AnyRelayMsgOuter, RelayCellFormat, StreamId,
109 };
110111fn to_unparsed<R: Rng + CryptoRng>(rng: &mut R, val: AnyRelayMsg) -> UnparsedRelayMsg {
112 UnparsedRelayMsg::from_singleton_body(
113 RelayCellFormat::V0,
114 AnyRelayMsgOuter::new(StreamId::new(77), val)
115 .encode(RelayCellFormat::V0, rng)
116 .expect("encoding failed"),
117 )
118 .unwrap()
119 }
120121#[test]
122fn halfstream_sendme() {
123let mut rng = testing_rng();
124125// Stream level SENDMEs are not authenticated and so the only way to make sure we were not
126 // expecting one is if the window busts its maximum.
127 //
128 // Starting the window at 450, the first SENDME will increment it to 500 (the maximum)
129 // meaning that the second SENDME will bust that and we'll noticed that it was unexpected.
130let sendw = StreamSendWindow::new(450);
131132let mut hs = HalfStream::new(
133 StreamSendFlowControl::new_window_based(sendw),
134 StreamRecvWindow::new(20),
135 DataCmdChecker::new_any(),
136 );
137138// one sendme is fine
139let m = msg::Sendme::new_empty();
140assert!(hs
141 .handle_msg(to_unparsed(&mut rng, m.clone().into()))
142 .is_ok());
143// but no more were expected!
144let e = hs
145 .handle_msg(to_unparsed(&mut rng, m.into()))
146 .err()
147 .unwrap();
148assert_eq!(
149format!("{}", e),
150"Circuit protocol violation: Unexpected stream SENDME"
151);
152 }
153154fn hs_new() -> HalfStream {
155 HalfStream::new(
156 StreamSendFlowControl::new_window_based(StreamSendWindow::new(20)),
157 StreamRecvWindow::new(20),
158 DataCmdChecker::new_any(),
159 )
160 }
161162#[test]
163fn halfstream_data() {
164let mut hs = hs_new();
165let mut rng = testing_rng();
166167// we didn't give a connected cell during setup, so do it now.
168hs.handle_msg(to_unparsed(&mut rng, msg::Connected::new_empty().into()))
169 .unwrap();
170171// 20 data cells are okay.
172let m = msg::Data::new(&b"this offer is unrepeatable"[..]).unwrap();
173for _ in 0_u8..20 {
174assert!(hs
175 .handle_msg(to_unparsed(&mut rng, m.clone().into()))
176 .is_ok());
177 }
178179// But one more is a protocol violation.
180let e = hs
181 .handle_msg(to_unparsed(&mut rng, m.into()))
182 .err()
183 .unwrap();
184assert_eq!(
185format!("{}", e),
186"Circuit protocol violation: Received a data cell in violation of a window"
187);
188 }
189190#[test]
191fn halfstream_connected() {
192let mut hs = hs_new();
193let mut rng = testing_rng();
194// We were told to accept a connected, so we'll accept one
195 // and no more.
196let m = msg::Connected::new_empty();
197assert!(hs
198 .handle_msg(to_unparsed(&mut rng, m.clone().into()))
199 .is_ok());
200assert!(hs
201 .handle_msg(to_unparsed(&mut rng, m.clone().into()))
202 .is_err());
203204// If we try that again _after getting a connected_,
205 // accept any.
206let mut cmd_checker = DataCmdChecker::new_any();
207 {
208 cmd_checker
209 .check_msg(&to_unparsed(&mut rng, msg::Connected::new_empty().into()))
210 .unwrap();
211 }
212let mut hs = HalfStream::new(
213 StreamSendFlowControl::new_window_based(StreamSendWindow::new(20)),
214 StreamRecvWindow::new(20),
215 cmd_checker,
216 );
217let e = hs
218 .handle_msg(to_unparsed(&mut rng, m.into()))
219 .err()
220 .unwrap();
221assert_eq!(
222format!("{}", e),
223"Stream protocol violation: Received CONNECTED twice on a stream."
224);
225 }
226227#[test]
228fn halfstream_other() {
229let mut hs = hs_new();
230let mut rng = testing_rng();
231let m = msg::Extended2::new(Vec::new());
232let e = hs
233 .handle_msg(to_unparsed(&mut rng, m.into()))
234 .err()
235 .unwrap();
236assert_eq!(
237format!("{}", e),
238"Stream protocol violation: Unexpected EXTENDED2 on a data stream!"
239);
240 }
241}