1//! Functionality for incoming streams, opened from the other side of a circuit.
23use bitvec::prelude::*;
45use super::{AnyCmdChecker, DataStream, StreamReader, StreamStatus};
6use crate::circuit::ClientCircSyncView;
7use crate::memquota::StreamAccount;
8use crate::tunnel::reactor::CloseStreamBehavior;
9use crate::tunnel::StreamTarget;
10use crate::{Error, Result};
11use derive_deftly::Deftly;
12use oneshot_fused_workaround as oneshot;
13use tor_cell::relaycell::{msg, RelayCmd, UnparsedRelayMsg};
14use tor_cell::restricted_msg;
15use tor_error::internal;
16use tor_memquota::derive_deftly_template_HasMemoryCost;
1718/// A pending request from the other end of the circuit for us to open a new
19/// stream.
20///
21/// Exits, directory caches, and onion services expect to receive these; others
22/// do not.
23///
24/// On receiving one of these objects, the party handling it should accept it or
25/// reject it. If it is dropped without being explicitly handled, a reject
26/// message will be sent anyway.
27#[derive(Debug)]
28pub struct IncomingStream {
29/// The message that the client sent us to begin the stream.
30request: IncomingStreamRequest,
31/// The information that we'll use to wire up the stream, if it is accepted.
32stream: StreamTarget,
33/// The underlying `StreamReader`.
34reader: StreamReader,
35/// The memory quota account that should be used for this stream's data
36memquota: StreamAccount,
37}
3839impl IncomingStream {
40/// Create a new `IncomingStream`.
41pub(crate) fn new(
42 request: IncomingStreamRequest,
43 stream: StreamTarget,
44 reader: StreamReader,
45 memquota: StreamAccount,
46 ) -> Self {
47Self {
48 request,
49 stream,
50 reader,
51 memquota,
52 }
53 }
5455/// Return the underlying message that was used to try to begin this stream.
56pub fn request(&self) -> &IncomingStreamRequest {
57&self.request
58 }
5960/// Accept this stream as a new [`DataStream`], and send the client a
61 /// message letting them know the stream was accepted.
62pub async fn accept_data(self, message: msg::Connected) -> Result<DataStream> {
63let Self {
64 request,
65mut stream,
66 reader,
67 memquota,
68 } = self;
6970match request {
71 IncomingStreamRequest::Begin(_) | IncomingStreamRequest::BeginDir(_) => {
72 stream.send(message.into()).await?;
73Ok(DataStream::new_connected(reader, stream, memquota))
74 }
75 IncomingStreamRequest::Resolve(_) => {
76Err(internal!("Cannot accept data on a RESOLVE stream").into())
77 }
78 }
79 }
8081/// Reject this request and send an error message to the client.
82pub async fn reject(mut self, message: msg::End) -> Result<()> {
83let rx = self.reject_inner(CloseStreamBehavior::SendEnd(message))?;
8485 rx.await.map_err(|_| Error::CircuitClosed)?.map(|_| ())
86 }
8788/// Reject this request and possibly send an error message to the client.
89 ///
90 /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
91fn reject_inner(
92&mut self,
93 message: CloseStreamBehavior,
94 ) -> Result<oneshot::Receiver<Result<()>>> {
95self.stream.close_pending(message)
96 }
9798/// Ignore this request without replying to the client.
99 ///
100 /// (If you drop an [`IncomingStream`] without calling `accept_data`,
101 /// `reject`, or this method, the drop handler will cause it to be
102 /// rejected.)
103pub async fn discard(mut self) -> Result<()> {
104let rx = self.reject_inner(CloseStreamBehavior::SendNothing)?;
105106 rx.await.map_err(|_| Error::CircuitClosed)?.map(|_| ())
107 }
108}
109110// NOTE: We do not need to `impl Drop for IncomingStream { .. }`: when its
111// StreamTarget is dropped, this will drop its internal mpsc::Sender, and the
112// circuit reactor will see a close on its mpsc::Receiver, and the circuit
113// reactor will itself send an End.
114115restricted_msg! {
116/// The allowed incoming messages on an `IncomingStream`.
117#[derive(Clone, Debug, Deftly)]
118 #[derive_deftly(HasMemoryCost)]
119 #[non_exhaustive]
120pub enum IncomingStreamRequest: RelayMsg {
121/// A BEGIN message.
122Begin,
123/// A BEGIN_DIR message.
124BeginDir,
125/// A RESOLVE message.
126Resolve,
127 }
128}
129130/// Bit-vector used to represent a list of permitted commands.
131///
132/// This is cheaper and faster than using a vec, and avoids side-channel
133/// attacks.
134type RelayCmdSet = bitvec::BitArr!(for 256);
135136/// A `CmdChecker` that enforces correctness for incoming commands on unrecognized streams that
137/// have a non-zero stream ID.
138#[derive(Debug)]
139pub(crate) struct IncomingCmdChecker {
140/// The "begin" commands that can be received on this type of circuit:
141 ///
142 /// * onion service circuits only accept `BEGIN`
143 /// * all relay circuits accept `BEGIN_DIR`
144 /// * exit relays additionally accept `BEGIN` or `RESOLVE` on relay circuits
145 /// * once CONNECT_UDP is implemented, relays and later onion services may accept CONNECT_UDP
146 /// as well
147allow_commands: RelayCmdSet,
148}
149150impl IncomingCmdChecker {
151/// Create a new boxed `IncomingCmdChecker`.
152pub(crate) fn new_any(allow_commands: &[RelayCmd]) -> AnyCmdChecker {
153let mut array = BitArray::ZERO;
154for c in allow_commands {
155 array.set(u8::from(*c) as usize, true);
156 }
157 Box::new(Self {
158 allow_commands: array,
159 })
160 }
161}
162163impl super::CmdChecker for IncomingCmdChecker {
164fn check_msg(&mut self, msg: &UnparsedRelayMsg) -> Result<StreamStatus> {
165if self.allow_commands[u8::from(msg.cmd()) as usize] {
166Ok(StreamStatus::Open)
167 } else {
168Err(Error::StreamProto(format!(
169"Unexpected {} on incoming stream",
170 msg.cmd()
171 )))
172 }
173 }
174175fn consume_checked_msg(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
176let _ = msg
177 .decode::<IncomingStreamRequest>()
178 .map_err(|err| Error::from_bytes_err(err, "invalid message on incoming stream"))?;
179180Ok(())
181 }
182}
183184/// A callback that can check whether a given stream request is acceptable
185/// immediately on its receipt.
186///
187/// This should only be used for checks that need to be done immediately, with a
188/// view of the state of the circuit. Any other checks should, if possible, be
189/// done on the [`IncomingStream`] objects as they are received.
190pub trait IncomingStreamRequestFilter: Send + 'static {
191/// Check an incoming stream request, and decide what to do with it.
192 ///
193 /// Implementations of this function should
194fn disposition(
195&mut self,
196 ctx: &IncomingStreamRequestContext<'_>,
197 circ: &ClientCircSyncView<'_>,
198 ) -> Result<IncomingStreamRequestDisposition>;
199}
200201/// What action to take with an incoming stream request.
202#[derive(Clone, Debug)]
203#[non_exhaustive]
204pub enum IncomingStreamRequestDisposition {
205/// Accept the request (for now) and pass it to the mpsc::Receiver
206 /// that is yielding them as [`IncomingStream``
207Accept,
208/// Rejected the request, and close the circuit on which it was received.
209CloseCircuit,
210/// Reject the request and send an END message.
211RejectRequest(msg::End),
212}
213214/// Information about a stream request, as passed to an [`IncomingStreamRequestFilter`].
215pub struct IncomingStreamRequestContext<'a> {
216/// The request message itself
217pub(crate) request: &'a IncomingStreamRequest,
218}
219220impl<'a> IncomingStreamRequestContext<'a> {
221/// Return a reference to the message used to request this stream.
222pub fn request(&self) -> &'a IncomingStreamRequest {
223self.request
224 }
225}
226227#[cfg(test)]
228mod test {
229// @@ begin test lint list maintained by maint/add_warning @@
230#![allow(clippy::bool_assert_comparison)]
231 #![allow(clippy::clone_on_copy)]
232 #![allow(clippy::dbg_macro)]
233 #![allow(clippy::mixed_attributes_style)]
234 #![allow(clippy::print_stderr)]
235 #![allow(clippy::print_stdout)]
236 #![allow(clippy::single_char_pattern)]
237 #![allow(clippy::unwrap_used)]
238 #![allow(clippy::unchecked_duration_subtraction)]
239 #![allow(clippy::useless_vec)]
240 #![allow(clippy::needless_pass_by_value)]
241//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
242243use tor_cell::relaycell::{
244 msg::{Begin, BeginDir, Data, Resolve},
245 AnyRelayMsgOuter, RelayCellFormat,
246 };
247248use super::*;
249250#[test]
251fn incoming_cmd_checker() {
252// Convert an AnyRelayMsg to an UnparsedRelayCell.
253let u = |msg| {
254let body = AnyRelayMsgOuter::new(None, msg)
255 .encode(RelayCellFormat::V0, &mut rand::rng())
256 .unwrap();
257 UnparsedRelayMsg::from_singleton_body(RelayCellFormat::V0, body).unwrap()
258 };
259let begin = u(Begin::new("allium.example.com", 443, 0).unwrap().into());
260let begin_dir = u(BeginDir::default().into());
261let resolve = u(Resolve::new("allium.example.com").into());
262let data = u(Data::new(&[1, 2, 3]).unwrap().into());
263264 {
265let mut cc_none = IncomingCmdChecker::new_any(&[]);
266for m in [&begin, &begin_dir, &resolve, &data] {
267assert!(cc_none.check_msg(m).is_err());
268 }
269 }
270271 {
272let mut cc_begin = IncomingCmdChecker::new_any(&[RelayCmd::BEGIN]);
273assert_eq!(cc_begin.check_msg(&begin).unwrap(), StreamStatus::Open);
274for m in [&begin_dir, &resolve, &data] {
275assert!(cc_begin.check_msg(m).is_err());
276 }
277 }
278279 {
280let mut cc_any = IncomingCmdChecker::new_any(&[
281 RelayCmd::BEGIN,
282 RelayCmd::BEGIN_DIR,
283 RelayCmd::RESOLVE,
284 ]);
285for m in [&begin, &begin_dir, &resolve] {
286assert_eq!(cc_any.check_msg(m).unwrap(), StreamStatus::Open);
287 }
288assert!(cc_any.check_msg(&data).is_err());
289 }
290 }
291}