1//! Functionality for incoming streams, opened from the other side of a circuit.
23use bitvec::prelude::*;
45use super::{AnyCmdChecker, DataStream, StreamReader, StreamStatus};
6use crate::circuit::ClientCircSyncView;
7use crate::memquota::StreamAccount;
8use crate::tunnel::reactor::CloseStreamBehavior;
9use crate::tunnel::StreamTarget;
10use crate::{Error, Result};
11use derive_deftly::Deftly;
12use oneshot_fused_workaround as oneshot;
13use tor_cell::relaycell::{msg, RelayCmd, UnparsedRelayMsg};
14use tor_cell::restricted_msg;
15use tor_error::internal;
16use tor_memquota::derive_deftly_template_HasMemoryCost;
17use tor_rtcompat::DynTimeProvider;
1819/// A pending request from the other end of the circuit for us to open a new
20/// stream.
21///
22/// Exits, directory caches, and onion services expect to receive these; others
23/// do not.
24///
25/// On receiving one of these objects, the party handling it should accept it or
26/// reject it. If it is dropped without being explicitly handled, a reject
27/// message will be sent anyway.
28#[derive(Debug)]
29pub struct IncomingStream {
30/// The runtime's time provider.
31time_provider: DynTimeProvider,
32/// The message that the client sent us to begin the stream.
33request: IncomingStreamRequest,
34/// The information that we'll use to wire up the stream, if it is accepted.
35stream: StreamTarget,
36/// The underlying `StreamReader`.
37reader: StreamReader,
38/// The memory quota account that should be used for this stream's data
39memquota: StreamAccount,
40}
4142impl IncomingStream {
43/// Create a new `IncomingStream`.
44pub(crate) fn new(
45 time_provider: DynTimeProvider,
46 request: IncomingStreamRequest,
47 stream: StreamTarget,
48 reader: StreamReader,
49 memquota: StreamAccount,
50 ) -> Self {
51Self {
52 time_provider,
53 request,
54 stream,
55 reader,
56 memquota,
57 }
58 }
5960/// Return the underlying message that was used to try to begin this stream.
61pub fn request(&self) -> &IncomingStreamRequest {
62&self.request
63 }
6465/// Accept this stream as a new [`DataStream`], and send the client a
66 /// message letting them know the stream was accepted.
67pub async fn accept_data(self, message: msg::Connected) -> Result<DataStream> {
68let Self {
69 time_provider,
70 request,
71mut stream,
72 reader,
73 memquota,
74 } = self;
7576match request {
77 IncomingStreamRequest::Begin(_) | IncomingStreamRequest::BeginDir(_) => {
78 stream.send(message.into()).await?;
79Ok(DataStream::new_connected(
80 time_provider,
81 reader,
82 stream,
83 memquota,
84 ))
85 }
86 IncomingStreamRequest::Resolve(_) => {
87Err(internal!("Cannot accept data on a RESOLVE stream").into())
88 }
89 }
90 }
9192/// Reject this request and send an error message to the client.
93pub async fn reject(mut self, message: msg::End) -> Result<()> {
94let rx = self.reject_inner(CloseStreamBehavior::SendEnd(message))?;
9596 rx.await.map_err(|_| Error::CircuitClosed)?.map(|_| ())
97 }
9899/// Reject this request and possibly send an error message to the client.
100 ///
101 /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
102fn reject_inner(
103&mut self,
104 message: CloseStreamBehavior,
105 ) -> Result<oneshot::Receiver<Result<()>>> {
106self.stream.close_pending(message)
107 }
108109/// Ignore this request without replying to the client.
110 ///
111 /// (If you drop an [`IncomingStream`] without calling `accept_data`,
112 /// `reject`, or this method, the drop handler will cause it to be
113 /// rejected.)
114pub async fn discard(mut self) -> Result<()> {
115let rx = self.reject_inner(CloseStreamBehavior::SendNothing)?;
116117 rx.await.map_err(|_| Error::CircuitClosed)?.map(|_| ())
118 }
119}
120121// NOTE: We do not need to `impl Drop for IncomingStream { .. }`: when its
122// StreamTarget is dropped, this will drop its internal mpsc::Sender, and the
123// circuit reactor will see a close on its mpsc::Receiver, and the circuit
124// reactor will itself send an End.
125126restricted_msg! {
127/// The allowed incoming messages on an `IncomingStream`.
128#[derive(Clone, Debug, Deftly)]
129 #[derive_deftly(HasMemoryCost)]
130 #[non_exhaustive]
131pub enum IncomingStreamRequest: RelayMsg {
132/// A BEGIN message.
133Begin,
134/// A BEGIN_DIR message.
135BeginDir,
136/// A RESOLVE message.
137Resolve,
138 }
139}
140141/// Bit-vector used to represent a list of permitted commands.
142///
143/// This is cheaper and faster than using a vec, and avoids side-channel
144/// attacks.
145type RelayCmdSet = bitvec::BitArr!(for 256);
146147/// A `CmdChecker` that enforces correctness for incoming commands on unrecognized streams that
148/// have a non-zero stream ID.
149#[derive(Debug)]
150pub(crate) struct IncomingCmdChecker {
151/// The "begin" commands that can be received on this type of circuit:
152 ///
153 /// * onion service circuits only accept `BEGIN`
154 /// * all relay circuits accept `BEGIN_DIR`
155 /// * exit relays additionally accept `BEGIN` or `RESOLVE` on relay circuits
156 /// * once CONNECT_UDP is implemented, relays and later onion services may accept CONNECT_UDP
157 /// as well
158allow_commands: RelayCmdSet,
159}
160161impl IncomingCmdChecker {
162/// Create a new boxed `IncomingCmdChecker`.
163pub(crate) fn new_any(allow_commands: &[RelayCmd]) -> AnyCmdChecker {
164let mut array = BitArray::ZERO;
165for c in allow_commands {
166 array.set(u8::from(*c) as usize, true);
167 }
168 Box::new(Self {
169 allow_commands: array,
170 })
171 }
172}
173174impl super::CmdChecker for IncomingCmdChecker {
175fn check_msg(&mut self, msg: &UnparsedRelayMsg) -> Result<StreamStatus> {
176if self.allow_commands[u8::from(msg.cmd()) as usize] {
177Ok(StreamStatus::Open)
178 } else {
179Err(Error::StreamProto(format!(
180"Unexpected {} on incoming stream",
181 msg.cmd()
182 )))
183 }
184 }
185186fn consume_checked_msg(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
187let _ = msg
188 .decode::<IncomingStreamRequest>()
189 .map_err(|err| Error::from_bytes_err(err, "invalid message on incoming stream"))?;
190191Ok(())
192 }
193}
194195/// A callback that can check whether a given stream request is acceptable
196/// immediately on its receipt.
197///
198/// This should only be used for checks that need to be done immediately, with a
199/// view of the state of the circuit. Any other checks should, if possible, be
200/// done on the [`IncomingStream`] objects as they are received.
201pub trait IncomingStreamRequestFilter: Send + 'static {
202/// Check an incoming stream request, and decide what to do with it.
203 ///
204 /// Implementations of this function should
205fn disposition(
206&mut self,
207 ctx: &IncomingStreamRequestContext<'_>,
208 circ: &ClientCircSyncView<'_>,
209 ) -> Result<IncomingStreamRequestDisposition>;
210}
211212/// What action to take with an incoming stream request.
213#[derive(Clone, Debug)]
214#[non_exhaustive]
215pub enum IncomingStreamRequestDisposition {
216/// Accept the request (for now) and pass it to the mpsc::Receiver
217 /// that is yielding them as [`IncomingStream``
218Accept,
219/// Rejected the request, and close the circuit on which it was received.
220CloseCircuit,
221/// Reject the request and send an END message.
222RejectRequest(msg::End),
223}
224225/// Information about a stream request, as passed to an [`IncomingStreamRequestFilter`].
226pub struct IncomingStreamRequestContext<'a> {
227/// The request message itself
228pub(crate) request: &'a IncomingStreamRequest,
229}
230231impl<'a> IncomingStreamRequestContext<'a> {
232/// Return a reference to the message used to request this stream.
233pub fn request(&self) -> &'a IncomingStreamRequest {
234self.request
235 }
236}
237238#[cfg(test)]
239mod test {
240// @@ begin test lint list maintained by maint/add_warning @@
241#![allow(clippy::bool_assert_comparison)]
242 #![allow(clippy::clone_on_copy)]
243 #![allow(clippy::dbg_macro)]
244 #![allow(clippy::mixed_attributes_style)]
245 #![allow(clippy::print_stderr)]
246 #![allow(clippy::print_stdout)]
247 #![allow(clippy::single_char_pattern)]
248 #![allow(clippy::unwrap_used)]
249 #![allow(clippy::unchecked_duration_subtraction)]
250 #![allow(clippy::useless_vec)]
251 #![allow(clippy::needless_pass_by_value)]
252//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
253254use tor_cell::relaycell::{
255 msg::{Begin, BeginDir, Data, Resolve},
256 AnyRelayMsgOuter, RelayCellFormat,
257 };
258259use super::*;
260261#[test]
262fn incoming_cmd_checker() {
263// Convert an AnyRelayMsg to an UnparsedRelayCell.
264let u = |msg| {
265let body = AnyRelayMsgOuter::new(None, msg)
266 .encode(RelayCellFormat::V0, &mut rand::rng())
267 .unwrap();
268 UnparsedRelayMsg::from_singleton_body(RelayCellFormat::V0, body).unwrap()
269 };
270let begin = u(Begin::new("allium.example.com", 443, 0).unwrap().into());
271let begin_dir = u(BeginDir::default().into());
272let resolve = u(Resolve::new("allium.example.com").into());
273let data = u(Data::new(&[1, 2, 3]).unwrap().into());
274275 {
276let mut cc_none = IncomingCmdChecker::new_any(&[]);
277for m in [&begin, &begin_dir, &resolve, &data] {
278assert!(cc_none.check_msg(m).is_err());
279 }
280 }
281282 {
283let mut cc_begin = IncomingCmdChecker::new_any(&[RelayCmd::BEGIN]);
284assert_eq!(cc_begin.check_msg(&begin).unwrap(), StreamStatus::Open);
285for m in [&begin_dir, &resolve, &data] {
286assert!(cc_begin.check_msg(m).is_err());
287 }
288 }
289290 {
291let mut cc_any = IncomingCmdChecker::new_any(&[
292 RelayCmd::BEGIN,
293 RelayCmd::BEGIN_DIR,
294 RelayCmd::RESOLVE,
295 ]);
296for m in [&begin, &begin_dir, &resolve] {
297assert_eq!(cc_any.check_msg(m).unwrap(), StreamStatus::Open);
298 }
299assert!(cc_any.check_msg(&data).is_err());
300 }
301 }
302}