tor_proto/stream/
incoming.rs

1//! Functionality for incoming streams, opened from the other side of a circuit.
2
3use bitvec::prelude::*;
4
5use super::{AnyCmdChecker, DataStream, StreamReader, StreamStatus};
6use crate::circuit::ClientCircSyncView;
7use crate::memquota::StreamAccount;
8use crate::tunnel::reactor::CloseStreamBehavior;
9use crate::tunnel::StreamTarget;
10use crate::{Error, Result};
11use derive_deftly::Deftly;
12use oneshot_fused_workaround as oneshot;
13use tor_cell::relaycell::{msg, RelayCmd, UnparsedRelayMsg};
14use tor_cell::restricted_msg;
15use tor_error::internal;
16use tor_memquota::derive_deftly_template_HasMemoryCost;
17use tor_rtcompat::DynTimeProvider;
18
19/// A pending request from the other end of the circuit for us to open a new
20/// stream.
21///
22/// Exits, directory caches, and onion services expect to receive these; others
23/// do not.
24///
25/// On receiving one of these objects, the party handling it should accept it or
26/// reject it.  If it is dropped without being explicitly handled, a reject
27/// message will be sent anyway.
28#[derive(Debug)]
29pub struct IncomingStream {
30    /// The runtime's time provider.
31    time_provider: DynTimeProvider,
32    /// The message that the client sent us to begin the stream.
33    request: IncomingStreamRequest,
34    /// The information that we'll use to wire up the stream, if it is accepted.
35    stream: StreamTarget,
36    /// The underlying `StreamReader`.
37    reader: StreamReader,
38    /// The memory quota account that should be used for this stream's data
39    memquota: StreamAccount,
40}
41
42impl IncomingStream {
43    /// Create a new `IncomingStream`.
44    pub(crate) fn new(
45        time_provider: DynTimeProvider,
46        request: IncomingStreamRequest,
47        stream: StreamTarget,
48        reader: StreamReader,
49        memquota: StreamAccount,
50    ) -> Self {
51        Self {
52            time_provider,
53            request,
54            stream,
55            reader,
56            memquota,
57        }
58    }
59
60    /// Return the underlying message that was used to try to begin this stream.
61    pub fn request(&self) -> &IncomingStreamRequest {
62        &self.request
63    }
64
65    /// Accept this stream as a new [`DataStream`], and send the client a
66    /// message letting them know the stream was accepted.
67    pub async fn accept_data(self, message: msg::Connected) -> Result<DataStream> {
68        let Self {
69            time_provider,
70            request,
71            mut stream,
72            reader,
73            memquota,
74        } = self;
75
76        match request {
77            IncomingStreamRequest::Begin(_) | IncomingStreamRequest::BeginDir(_) => {
78                stream.send(message.into()).await?;
79                Ok(DataStream::new_connected(
80                    time_provider,
81                    reader,
82                    stream,
83                    memquota,
84                ))
85            }
86            IncomingStreamRequest::Resolve(_) => {
87                Err(internal!("Cannot accept data on a RESOLVE stream").into())
88            }
89        }
90    }
91
92    /// Reject this request and send an error message to the client.
93    pub async fn reject(mut self, message: msg::End) -> Result<()> {
94        let rx = self.reject_inner(CloseStreamBehavior::SendEnd(message))?;
95
96        rx.await.map_err(|_| Error::CircuitClosed)?.map(|_| ())
97    }
98
99    /// Reject this request and possibly send an error message to the client.
100    ///
101    /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
102    fn reject_inner(
103        &mut self,
104        message: CloseStreamBehavior,
105    ) -> Result<oneshot::Receiver<Result<()>>> {
106        self.stream.close_pending(message)
107    }
108
109    /// Ignore this request without replying to the client.
110    ///
111    /// (If you drop an [`IncomingStream`] without calling `accept_data`,
112    /// `reject`, or this method, the drop handler will cause it to be
113    /// rejected.)
114    pub async fn discard(mut self) -> Result<()> {
115        let rx = self.reject_inner(CloseStreamBehavior::SendNothing)?;
116
117        rx.await.map_err(|_| Error::CircuitClosed)?.map(|_| ())
118    }
119}
120
121// NOTE: We do not need to `impl Drop for IncomingStream { .. }`: when its
122// StreamTarget is dropped, this will drop its internal mpsc::Sender, and the
123// circuit reactor will see a close on its mpsc::Receiver, and the circuit
124// reactor will itself send an End.
125
126restricted_msg! {
127    /// The allowed incoming messages on an `IncomingStream`.
128    #[derive(Clone, Debug, Deftly)]
129    #[derive_deftly(HasMemoryCost)]
130    #[non_exhaustive]
131    pub enum IncomingStreamRequest: RelayMsg {
132        /// A BEGIN message.
133        Begin,
134        /// A BEGIN_DIR message.
135        BeginDir,
136        /// A RESOLVE message.
137        Resolve,
138    }
139}
140
141/// Bit-vector used to represent a list of permitted commands.
142///
143/// This is cheaper and faster than using a vec, and avoids side-channel
144/// attacks.
145type RelayCmdSet = bitvec::BitArr!(for 256);
146
147/// A `CmdChecker` that enforces correctness for incoming commands on unrecognized streams that
148/// have a non-zero stream ID.
149#[derive(Debug)]
150pub(crate) struct IncomingCmdChecker {
151    /// The "begin" commands that can be received on this type of circuit:
152    ///
153    ///   * onion service circuits only accept `BEGIN`
154    ///   * all relay circuits accept `BEGIN_DIR`
155    ///   * exit relays additionally accept `BEGIN` or `RESOLVE` on relay circuits
156    ///   * once CONNECT_UDP is implemented, relays and later onion services may accept CONNECT_UDP
157    ///     as well
158    allow_commands: RelayCmdSet,
159}
160
161impl IncomingCmdChecker {
162    /// Create a new boxed `IncomingCmdChecker`.
163    pub(crate) fn new_any(allow_commands: &[RelayCmd]) -> AnyCmdChecker {
164        let mut array = BitArray::ZERO;
165        for c in allow_commands {
166            array.set(u8::from(*c) as usize, true);
167        }
168        Box::new(Self {
169            allow_commands: array,
170        })
171    }
172}
173
174impl super::CmdChecker for IncomingCmdChecker {
175    fn check_msg(&mut self, msg: &UnparsedRelayMsg) -> Result<StreamStatus> {
176        if self.allow_commands[u8::from(msg.cmd()) as usize] {
177            Ok(StreamStatus::Open)
178        } else {
179            Err(Error::StreamProto(format!(
180                "Unexpected {} on incoming stream",
181                msg.cmd()
182            )))
183        }
184    }
185
186    fn consume_checked_msg(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
187        let _ = msg
188            .decode::<IncomingStreamRequest>()
189            .map_err(|err| Error::from_bytes_err(err, "invalid message on incoming stream"))?;
190
191        Ok(())
192    }
193}
194
195/// A callback that can check whether a given stream request is acceptable
196/// immediately on its receipt.
197///
198/// This should only be used for checks that need to be done immediately, with a
199/// view of the state of the circuit.  Any other checks should, if possible, be
200/// done on the [`IncomingStream`] objects as they are received.
201pub trait IncomingStreamRequestFilter: Send + 'static {
202    /// Check an incoming stream request, and decide what to do with it.
203    ///
204    /// Implementations of this function should
205    fn disposition(
206        &mut self,
207        ctx: &IncomingStreamRequestContext<'_>,
208        circ: &ClientCircSyncView<'_>,
209    ) -> Result<IncomingStreamRequestDisposition>;
210}
211
212/// What action to take with an incoming stream request.
213#[derive(Clone, Debug)]
214#[non_exhaustive]
215pub enum IncomingStreamRequestDisposition {
216    /// Accept the request (for now) and pass it to the mpsc::Receiver
217    /// that is yielding them as [`IncomingStream``
218    Accept,
219    /// Rejected the request, and close the circuit on which it was received.
220    CloseCircuit,
221    /// Reject the request and send an END message.
222    RejectRequest(msg::End),
223}
224
225/// Information about a stream request, as passed to an [`IncomingStreamRequestFilter`].
226pub struct IncomingStreamRequestContext<'a> {
227    /// The request message itself
228    pub(crate) request: &'a IncomingStreamRequest,
229}
230
231impl<'a> IncomingStreamRequestContext<'a> {
232    /// Return a reference to the message used to request this stream.
233    pub fn request(&self) -> &'a IncomingStreamRequest {
234        self.request
235    }
236}
237
238#[cfg(test)]
239mod test {
240    // @@ begin test lint list maintained by maint/add_warning @@
241    #![allow(clippy::bool_assert_comparison)]
242    #![allow(clippy::clone_on_copy)]
243    #![allow(clippy::dbg_macro)]
244    #![allow(clippy::mixed_attributes_style)]
245    #![allow(clippy::print_stderr)]
246    #![allow(clippy::print_stdout)]
247    #![allow(clippy::single_char_pattern)]
248    #![allow(clippy::unwrap_used)]
249    #![allow(clippy::unchecked_duration_subtraction)]
250    #![allow(clippy::useless_vec)]
251    #![allow(clippy::needless_pass_by_value)]
252    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
253
254    use tor_cell::relaycell::{
255        msg::{Begin, BeginDir, Data, Resolve},
256        AnyRelayMsgOuter, RelayCellFormat,
257    };
258
259    use super::*;
260
261    #[test]
262    fn incoming_cmd_checker() {
263        // Convert an AnyRelayMsg to an UnparsedRelayCell.
264        let u = |msg| {
265            let body = AnyRelayMsgOuter::new(None, msg)
266                .encode(RelayCellFormat::V0, &mut rand::rng())
267                .unwrap();
268            UnparsedRelayMsg::from_singleton_body(RelayCellFormat::V0, body).unwrap()
269        };
270        let begin = u(Begin::new("allium.example.com", 443, 0).unwrap().into());
271        let begin_dir = u(BeginDir::default().into());
272        let resolve = u(Resolve::new("allium.example.com").into());
273        let data = u(Data::new(&[1, 2, 3]).unwrap().into());
274
275        {
276            let mut cc_none = IncomingCmdChecker::new_any(&[]);
277            for m in [&begin, &begin_dir, &resolve, &data] {
278                assert!(cc_none.check_msg(m).is_err());
279            }
280        }
281
282        {
283            let mut cc_begin = IncomingCmdChecker::new_any(&[RelayCmd::BEGIN]);
284            assert_eq!(cc_begin.check_msg(&begin).unwrap(), StreamStatus::Open);
285            for m in [&begin_dir, &resolve, &data] {
286                assert!(cc_begin.check_msg(m).is_err());
287            }
288        }
289
290        {
291            let mut cc_any = IncomingCmdChecker::new_any(&[
292                RelayCmd::BEGIN,
293                RelayCmd::BEGIN_DIR,
294                RelayCmd::RESOLVE,
295            ]);
296            for m in [&begin, &begin_dir, &resolve] {
297                assert_eq!(cc_any.check_msg(m).unwrap(), StreamStatus::Open);
298            }
299            assert!(cc_any.check_msg(&data).is_err());
300        }
301    }
302}