1
//! Functionality for incoming streams, opened from the other side of a circuit.
2

            
3
use bitvec::prelude::*;
4

            
5
use super::{AnyCmdChecker, DataStream, StreamStatus};
6
use crate::circuit::{ClientCircSyncView, StreamComponents};
7
use crate::tunnel::reactor::CloseStreamBehavior;
8
use crate::{Error, Result};
9
use derive_deftly::Deftly;
10
use oneshot_fused_workaround as oneshot;
11
use tor_cell::relaycell::{msg, RelayCmd, UnparsedRelayMsg};
12
use tor_cell::restricted_msg;
13
use tor_error::internal;
14
use tor_memquota::derive_deftly_template_HasMemoryCost;
15
use tor_rtcompat::DynTimeProvider;
16

            
17
/// A pending request from the other end of the circuit for us to open a new
18
/// stream.
19
///
20
/// Exits, directory caches, and onion services expect to receive these; others
21
/// do not.
22
///
23
/// On receiving one of these objects, the party handling it should accept it or
24
/// reject it.  If it is dropped without being explicitly handled, a reject
25
/// message will be sent anyway.
26
#[derive(Debug)]
27
pub struct IncomingStream {
28
    /// The runtime's time provider.
29
    time_provider: DynTimeProvider,
30
    /// The message that the client sent us to begin the stream.
31
    request: IncomingStreamRequest,
32
    /// Stream components used to assemble the [`DataStream`].
33
    components: StreamComponents,
34
}
35

            
36
impl IncomingStream {
37
    /// Create a new `IncomingStream`.
38
24
    pub(crate) fn new(
39
24
        time_provider: DynTimeProvider,
40
24
        request: IncomingStreamRequest,
41
24
        components: StreamComponents,
42
24
    ) -> Self {
43
24
        Self {
44
24
            time_provider,
45
24
            request,
46
24
            components,
47
24
        }
48
24
    }
49

            
50
    /// Return the underlying message that was used to try to begin this stream.
51
    pub fn request(&self) -> &IncomingStreamRequest {
52
        &self.request
53
    }
54

            
55
    /// Accept this stream as a new [`DataStream`], and send the client a
56
    /// message letting them know the stream was accepted.
57
24
    pub async fn accept_data(self, message: msg::Connected) -> Result<DataStream> {
58
16
        let Self {
59
16
            time_provider,
60
16
            request,
61
16
            components:
62
16
                StreamComponents {
63
16
                    mut target,
64
16
                    stream_receiver,
65
16
                    xon_xoff_reader_ctrl,
66
16
                    memquota,
67
16
                },
68
16
        } = self;
69
16

            
70
16
        match request {
71
            IncomingStreamRequest::Begin(_) | IncomingStreamRequest::BeginDir(_) => {
72
16
                target.send(message.into()).await?;
73
16
                Ok(DataStream::new_connected(
74
16
                    time_provider,
75
16
                    stream_receiver,
76
16
                    xon_xoff_reader_ctrl,
77
16
                    target,
78
16
                    memquota,
79
16
                ))
80
            }
81
            IncomingStreamRequest::Resolve(_) => {
82
                Err(internal!("Cannot accept data on a RESOLVE stream").into())
83
            }
84
        }
85
16
    }
86

            
87
    /// Reject this request and send an error message to the client.
88
12
    pub async fn reject(mut self, message: msg::End) -> Result<()> {
89
8
        let rx = self.reject_inner(CloseStreamBehavior::SendEnd(message))?;
90

            
91
8
        rx.await.map_err(|_| Error::CircuitClosed)?.map(|_| ())
92
8
    }
93

            
94
    /// Reject this request and possibly send an error message to the client.
95
    ///
96
    /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
97
8
    fn reject_inner(
98
8
        &mut self,
99
8
        message: CloseStreamBehavior,
100
8
    ) -> Result<oneshot::Receiver<Result<()>>> {
101
8
        self.components.target.close_pending(message)
102
8
    }
103

            
104
    /// Ignore this request without replying to the client.
105
    ///
106
    /// (If you drop an [`IncomingStream`] without calling `accept_data`,
107
    /// `reject`, or this method, the drop handler will cause it to be
108
    /// rejected.)
109
    pub async fn discard(mut self) -> Result<()> {
110
        let rx = self.reject_inner(CloseStreamBehavior::SendNothing)?;
111

            
112
        rx.await.map_err(|_| Error::CircuitClosed)?.map(|_| ())
113
    }
114
}
115

            
116
// NOTE: We do not need to `impl Drop for IncomingStream { .. }`: when its
117
// StreamTarget is dropped, this will drop its internal mpsc::Sender, and the
118
// circuit reactor will see a close on its mpsc::Receiver, and the circuit
119
// reactor will itself send an End.
120

            
121
restricted_msg! {
122
    /// The allowed incoming messages on an `IncomingStream`.
123
    #[derive(Clone, Debug, Deftly)]
124
    #[derive_deftly(HasMemoryCost)]
125
    #[non_exhaustive]
126
    pub enum IncomingStreamRequest: RelayMsg {
127
        /// A BEGIN message.
128
        Begin,
129
        /// A BEGIN_DIR message.
130
        BeginDir,
131
        /// A RESOLVE message.
132
        Resolve,
133
    }
134
}
135

            
136
/// Bit-vector used to represent a list of permitted commands.
137
///
138
/// This is cheaper and faster than using a vec, and avoids side-channel
139
/// attacks.
140
type RelayCmdSet = bitvec::BitArr!(for 256);
141

            
142
/// A `CmdChecker` that enforces correctness for incoming commands on unrecognized streams that
143
/// have a non-zero stream ID.
144
#[derive(Debug)]
145
pub(crate) struct IncomingCmdChecker {
146
    /// The "begin" commands that can be received on this type of circuit:
147
    ///
148
    ///   * onion service circuits only accept `BEGIN`
149
    ///   * all relay circuits accept `BEGIN_DIR`
150
    ///   * exit relays additionally accept `BEGIN` or `RESOLVE` on relay circuits
151
    ///   * once CONNECT_UDP is implemented, relays and later onion services may accept CONNECT_UDP
152
    ///     as well
153
    allow_commands: RelayCmdSet,
154
}
155

            
156
impl IncomingCmdChecker {
157
    /// Create a new boxed `IncomingCmdChecker`.
158
46
    pub(crate) fn new_any(allow_commands: &[RelayCmd]) -> AnyCmdChecker {
159
46
        let mut array = BitArray::ZERO;
160
94
        for c in allow_commands {
161
48
            array.set(u8::from(*c) as usize, true);
162
48
        }
163
46
        Box::new(Self {
164
46
            allow_commands: array,
165
46
        })
166
46
    }
167
}
168

            
169
impl super::CmdChecker for IncomingCmdChecker {
170
48
    fn check_msg(&mut self, msg: &UnparsedRelayMsg) -> Result<StreamStatus> {
171
48
        if self.allow_commands[u8::from(msg.cmd()) as usize] {
172
32
            Ok(StreamStatus::Open)
173
        } else {
174
16
            Err(Error::StreamProto(format!(
175
16
                "Unexpected {} on incoming stream",
176
16
                msg.cmd()
177
16
            )))
178
        }
179
48
    }
180

            
181
    fn consume_checked_msg(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
182
        let _ = msg
183
            .decode::<IncomingStreamRequest>()
184
            .map_err(|err| Error::from_bytes_err(err, "invalid message on incoming stream"))?;
185

            
186
        Ok(())
187
    }
188
}
189

            
190
/// A callback that can check whether a given stream request is acceptable
191
/// immediately on its receipt.
192
///
193
/// This should only be used for checks that need to be done immediately, with a
194
/// view of the state of the circuit.  Any other checks should, if possible, be
195
/// done on the [`IncomingStream`] objects as they are received.
196
pub trait IncomingStreamRequestFilter: Send + 'static {
197
    /// Check an incoming stream request, and decide what to do with it.
198
    ///
199
    /// Implementations of this function should
200
    fn disposition(
201
        &mut self,
202
        ctx: &IncomingStreamRequestContext<'_>,
203
        circ: &ClientCircSyncView<'_>,
204
    ) -> Result<IncomingStreamRequestDisposition>;
205
}
206

            
207
/// What action to take with an incoming stream request.
208
#[derive(Clone, Debug)]
209
#[non_exhaustive]
210
pub enum IncomingStreamRequestDisposition {
211
    /// Accept the request (for now) and pass it to the mpsc::Receiver
212
    /// that is yielding them as [`IncomingStream``
213
    Accept,
214
    /// Rejected the request, and close the circuit on which it was received.
215
    CloseCircuit,
216
    /// Reject the request and send an END message.
217
    RejectRequest(msg::End),
218
}
219

            
220
/// Information about a stream request, as passed to an [`IncomingStreamRequestFilter`].
221
pub struct IncomingStreamRequestContext<'a> {
222
    /// The request message itself
223
    pub(crate) request: &'a IncomingStreamRequest,
224
}
225

            
226
impl<'a> IncomingStreamRequestContext<'a> {
227
    /// Return a reference to the message used to request this stream.
228
    pub fn request(&self) -> &'a IncomingStreamRequest {
229
        self.request
230
    }
231
}
232

            
233
#[cfg(test)]
234
mod test {
235
    // @@ begin test lint list maintained by maint/add_warning @@
236
    #![allow(clippy::bool_assert_comparison)]
237
    #![allow(clippy::clone_on_copy)]
238
    #![allow(clippy::dbg_macro)]
239
    #![allow(clippy::mixed_attributes_style)]
240
    #![allow(clippy::print_stderr)]
241
    #![allow(clippy::print_stdout)]
242
    #![allow(clippy::single_char_pattern)]
243
    #![allow(clippy::unwrap_used)]
244
    #![allow(clippy::unchecked_duration_subtraction)]
245
    #![allow(clippy::useless_vec)]
246
    #![allow(clippy::needless_pass_by_value)]
247
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
248

            
249
    use tor_cell::relaycell::{
250
        msg::{Begin, BeginDir, Data, Resolve},
251
        AnyRelayMsgOuter, RelayCellFormat,
252
    };
253

            
254
    use super::*;
255

            
256
    #[test]
257
    fn incoming_cmd_checker() {
258
        // Convert an AnyRelayMsg to an UnparsedRelayCell.
259
        let u = |msg| {
260
            let body = AnyRelayMsgOuter::new(None, msg)
261
                .encode(RelayCellFormat::V0, &mut rand::rng())
262
                .unwrap();
263
            UnparsedRelayMsg::from_singleton_body(RelayCellFormat::V0, body).unwrap()
264
        };
265
        let begin = u(Begin::new("allium.example.com", 443, 0).unwrap().into());
266
        let begin_dir = u(BeginDir::default().into());
267
        let resolve = u(Resolve::new("allium.example.com").into());
268
        let data = u(Data::new(&[1, 2, 3]).unwrap().into());
269

            
270
        {
271
            let mut cc_none = IncomingCmdChecker::new_any(&[]);
272
            for m in [&begin, &begin_dir, &resolve, &data] {
273
                assert!(cc_none.check_msg(m).is_err());
274
            }
275
        }
276

            
277
        {
278
            let mut cc_begin = IncomingCmdChecker::new_any(&[RelayCmd::BEGIN]);
279
            assert_eq!(cc_begin.check_msg(&begin).unwrap(), StreamStatus::Open);
280
            for m in [&begin_dir, &resolve, &data] {
281
                assert!(cc_begin.check_msg(m).is_err());
282
            }
283
        }
284

            
285
        {
286
            let mut cc_any = IncomingCmdChecker::new_any(&[
287
                RelayCmd::BEGIN,
288
                RelayCmd::BEGIN_DIR,
289
                RelayCmd::RESOLVE,
290
            ]);
291
            for m in [&begin, &begin_dir, &resolve] {
292
                assert_eq!(cc_any.check_msg(m).unwrap(), StreamStatus::Open);
293
            }
294
            assert!(cc_any.check_msg(&data).is_err());
295
        }
296
    }
297
}