1
//! Functionality for incoming streams, opened from the other side of a circuit.
2

            
3
use bitvec::prelude::*;
4

            
5
use super::{AnyCmdChecker, DataStream, StreamReader, StreamStatus};
6
use crate::circuit::ClientCircSyncView;
7
use crate::memquota::StreamAccount;
8
use crate::tunnel::reactor::CloseStreamBehavior;
9
use crate::tunnel::StreamTarget;
10
use crate::{Error, Result};
11
use derive_deftly::Deftly;
12
use oneshot_fused_workaround as oneshot;
13
use tor_cell::relaycell::{msg, RelayCmd, UnparsedRelayMsg};
14
use tor_cell::restricted_msg;
15
use tor_error::internal;
16
use tor_memquota::derive_deftly_template_HasMemoryCost;
17

            
18
/// A pending request from the other end of the circuit for us to open a new
19
/// stream.
20
///
21
/// Exits, directory caches, and onion services expect to receive these; others
22
/// do not.
23
///
24
/// On receiving one of these objects, the party handling it should accept it or
25
/// reject it.  If it is dropped without being explicitly handled, a reject
26
/// message will be sent anyway.
27
#[derive(Debug)]
28
pub struct IncomingStream {
29
    /// The message that the client sent us to begin the stream.
30
    request: IncomingStreamRequest,
31
    /// The information that we'll use to wire up the stream, if it is accepted.
32
    stream: StreamTarget,
33
    /// The underlying `StreamReader`.
34
    reader: StreamReader,
35
    /// The memory quota account that should be used for this stream's data
36
    memquota: StreamAccount,
37
}
38

            
39
impl IncomingStream {
40
    /// Create a new `IncomingStream`.
41
24
    pub(crate) fn new(
42
24
        request: IncomingStreamRequest,
43
24
        stream: StreamTarget,
44
24
        reader: StreamReader,
45
24
        memquota: StreamAccount,
46
24
    ) -> Self {
47
24
        Self {
48
24
            request,
49
24
            stream,
50
24
            reader,
51
24
            memquota,
52
24
        }
53
24
    }
54

            
55
    /// Return the underlying message that was used to try to begin this stream.
56
    pub fn request(&self) -> &IncomingStreamRequest {
57
        &self.request
58
    }
59

            
60
    /// Accept this stream as a new [`DataStream`], and send the client a
61
    /// message letting them know the stream was accepted.
62
24
    pub async fn accept_data(self, message: msg::Connected) -> Result<DataStream> {
63
16
        let Self {
64
16
            request,
65
16
            mut stream,
66
16
            reader,
67
16
            memquota,
68
16
        } = self;
69
16

            
70
16
        match request {
71
            IncomingStreamRequest::Begin(_) | IncomingStreamRequest::BeginDir(_) => {
72
16
                stream.send(message.into()).await?;
73
16
                Ok(DataStream::new_connected(reader, stream, memquota))
74
            }
75
            IncomingStreamRequest::Resolve(_) => {
76
                Err(internal!("Cannot accept data on a RESOLVE stream").into())
77
            }
78
        }
79
16
    }
80

            
81
    /// Reject this request and send an error message to the client.
82
12
    pub async fn reject(mut self, message: msg::End) -> Result<()> {
83
8
        let rx = self.reject_inner(CloseStreamBehavior::SendEnd(message))?;
84

            
85
8
        rx.await.map_err(|_| Error::CircuitClosed)?.map(|_| ())
86
8
    }
87

            
88
    /// Reject this request and possibly send an error message to the client.
89
    ///
90
    /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
91
8
    fn reject_inner(
92
8
        &mut self,
93
8
        message: CloseStreamBehavior,
94
8
    ) -> Result<oneshot::Receiver<Result<()>>> {
95
8
        self.stream.close_pending(message)
96
8
    }
97

            
98
    /// Ignore this request without replying to the client.
99
    ///
100
    /// (If you drop an [`IncomingStream`] without calling `accept_data`,
101
    /// `reject`, or this method, the drop handler will cause it to be
102
    /// rejected.)
103
    pub async fn discard(mut self) -> Result<()> {
104
        let rx = self.reject_inner(CloseStreamBehavior::SendNothing)?;
105

            
106
        rx.await.map_err(|_| Error::CircuitClosed)?.map(|_| ())
107
    }
108
}
109

            
110
// NOTE: We do not need to `impl Drop for IncomingStream { .. }`: when its
111
// StreamTarget is dropped, this will drop its internal mpsc::Sender, and the
112
// circuit reactor will see a close on its mpsc::Receiver, and the circuit
113
// reactor will itself send an End.
114

            
115
restricted_msg! {
116
    /// The allowed incoming messages on an `IncomingStream`.
117
    #[derive(Clone, Debug, Deftly)]
118
    #[derive_deftly(HasMemoryCost)]
119
    #[non_exhaustive]
120
    pub enum IncomingStreamRequest: RelayMsg {
121
        /// A BEGIN message.
122
        Begin,
123
        /// A BEGIN_DIR message.
124
        BeginDir,
125
        /// A RESOLVE message.
126
        Resolve,
127
    }
128
}
129

            
130
/// Bit-vector used to represent a list of permitted commands.
131
///
132
/// This is cheaper and faster than using a vec, and avoids side-channel
133
/// attacks.
134
type RelayCmdSet = bitvec::BitArr!(for 256);
135

            
136
/// A `CmdChecker` that enforces correctness for incoming commands on unrecognized streams that
137
/// have a non-zero stream ID.
138
#[derive(Debug)]
139
pub(crate) struct IncomingCmdChecker {
140
    /// The "begin" commands that can be received on this type of circuit:
141
    ///
142
    ///   * onion service circuits only accept `BEGIN`
143
    ///   * all relay circuits accept `BEGIN_DIR`
144
    ///   * exit relays additionally accept `BEGIN` or `RESOLVE` on relay circuits
145
    ///   * once CONNECT_UDP is implemented, relays and later onion services may accept CONNECT_UDP
146
    ///     as well
147
    allow_commands: RelayCmdSet,
148
}
149

            
150
impl IncomingCmdChecker {
151
    /// Create a new boxed `IncomingCmdChecker`.
152
46
    pub(crate) fn new_any(allow_commands: &[RelayCmd]) -> AnyCmdChecker {
153
46
        let mut array = BitArray::ZERO;
154
94
        for c in allow_commands {
155
48
            array.set(u8::from(*c) as usize, true);
156
48
        }
157
46
        Box::new(Self {
158
46
            allow_commands: array,
159
46
        })
160
46
    }
161
}
162

            
163
impl super::CmdChecker for IncomingCmdChecker {
164
48
    fn check_msg(&mut self, msg: &UnparsedRelayMsg) -> Result<StreamStatus> {
165
48
        if self.allow_commands[u8::from(msg.cmd()) as usize] {
166
32
            Ok(StreamStatus::Open)
167
        } else {
168
16
            Err(Error::StreamProto(format!(
169
16
                "Unexpected {} on incoming stream",
170
16
                msg.cmd()
171
16
            )))
172
        }
173
48
    }
174

            
175
    fn consume_checked_msg(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
176
        let _ = msg
177
            .decode::<IncomingStreamRequest>()
178
            .map_err(|err| Error::from_bytes_err(err, "invalid message on incoming stream"))?;
179

            
180
        Ok(())
181
    }
182
}
183

            
184
/// A callback that can check whether a given stream request is acceptable
185
/// immediately on its receipt.
186
///
187
/// This should only be used for checks that need to be done immediately, with a
188
/// view of the state of the circuit.  Any other checks should, if possible, be
189
/// done on the [`IncomingStream`] objects as they are received.
190
pub trait IncomingStreamRequestFilter: Send + 'static {
191
    /// Check an incoming stream request, and decide what to do with it.
192
    ///
193
    /// Implementations of this function should
194
    fn disposition(
195
        &mut self,
196
        ctx: &IncomingStreamRequestContext<'_>,
197
        circ: &ClientCircSyncView<'_>,
198
    ) -> Result<IncomingStreamRequestDisposition>;
199
}
200

            
201
/// What action to take with an incoming stream request.
202
#[derive(Clone, Debug)]
203
#[non_exhaustive]
204
pub enum IncomingStreamRequestDisposition {
205
    /// Accept the request (for now) and pass it to the mpsc::Receiver
206
    /// that is yielding them as [`IncomingStream``
207
    Accept,
208
    /// Rejected the request, and close the circuit on which it was received.
209
    CloseCircuit,
210
    /// Reject the request and send an END message.
211
    RejectRequest(msg::End),
212
}
213

            
214
/// Information about a stream request, as passed to an [`IncomingStreamRequestFilter`].
215
pub struct IncomingStreamRequestContext<'a> {
216
    /// The request message itself
217
    pub(crate) request: &'a IncomingStreamRequest,
218
}
219

            
220
impl<'a> IncomingStreamRequestContext<'a> {
221
    /// Return a reference to the message used to request this stream.
222
    pub fn request(&self) -> &'a IncomingStreamRequest {
223
        self.request
224
    }
225
}
226

            
227
#[cfg(test)]
228
mod test {
229
    // @@ begin test lint list maintained by maint/add_warning @@
230
    #![allow(clippy::bool_assert_comparison)]
231
    #![allow(clippy::clone_on_copy)]
232
    #![allow(clippy::dbg_macro)]
233
    #![allow(clippy::mixed_attributes_style)]
234
    #![allow(clippy::print_stderr)]
235
    #![allow(clippy::print_stdout)]
236
    #![allow(clippy::single_char_pattern)]
237
    #![allow(clippy::unwrap_used)]
238
    #![allow(clippy::unchecked_duration_subtraction)]
239
    #![allow(clippy::useless_vec)]
240
    #![allow(clippy::needless_pass_by_value)]
241
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
242

            
243
    use tor_cell::relaycell::{
244
        msg::{Begin, BeginDir, Data, Resolve},
245
        AnyRelayMsgOuter, RelayCellFormat,
246
    };
247

            
248
    use super::*;
249

            
250
    #[test]
251
    fn incoming_cmd_checker() {
252
        // Convert an AnyRelayMsg to an UnparsedRelayCell.
253
        let u = |msg| {
254
            let body = AnyRelayMsgOuter::new(None, msg)
255
                .encode(RelayCellFormat::V0, &mut rand::rng())
256
                .unwrap();
257
            UnparsedRelayMsg::from_singleton_body(RelayCellFormat::V0, body).unwrap()
258
        };
259
        let begin = u(Begin::new("allium.example.com", 443, 0).unwrap().into());
260
        let begin_dir = u(BeginDir::default().into());
261
        let resolve = u(Resolve::new("allium.example.com").into());
262
        let data = u(Data::new(&[1, 2, 3]).unwrap().into());
263

            
264
        {
265
            let mut cc_none = IncomingCmdChecker::new_any(&[]);
266
            for m in [&begin, &begin_dir, &resolve, &data] {
267
                assert!(cc_none.check_msg(m).is_err());
268
            }
269
        }
270

            
271
        {
272
            let mut cc_begin = IncomingCmdChecker::new_any(&[RelayCmd::BEGIN]);
273
            assert_eq!(cc_begin.check_msg(&begin).unwrap(), StreamStatus::Open);
274
            for m in [&begin_dir, &resolve, &data] {
275
                assert!(cc_begin.check_msg(m).is_err());
276
            }
277
        }
278

            
279
        {
280
            let mut cc_any = IncomingCmdChecker::new_any(&[
281
                RelayCmd::BEGIN,
282
                RelayCmd::BEGIN_DIR,
283
                RelayCmd::RESOLVE,
284
            ]);
285
            for m in [&begin, &begin_dir, &resolve] {
286
                assert_eq!(cc_any.check_msg(m).unwrap(), StreamStatus::Open);
287
            }
288
            assert!(cc_any.check_msg(&data).is_err());
289
        }
290
    }
291
}