1//! Functionality for incoming streams, opened from the other side of a circuit.
23use bitvec::prelude::*;
45use super::{AnyCmdChecker, DataStream, StreamStatus};
6use crate::circuit::{ClientCircSyncView, StreamComponents};
7use crate::tunnel::reactor::CloseStreamBehavior;
8use crate::{Error, Result};
9use derive_deftly::Deftly;
10use oneshot_fused_workaround as oneshot;
11use tor_cell::relaycell::{msg, RelayCmd, UnparsedRelayMsg};
12use tor_cell::restricted_msg;
13use tor_error::internal;
14use tor_memquota::derive_deftly_template_HasMemoryCost;
15use tor_rtcompat::DynTimeProvider;
1617/// A pending request from the other end of the circuit for us to open a new
18/// stream.
19///
20/// Exits, directory caches, and onion services expect to receive these; others
21/// do not.
22///
23/// On receiving one of these objects, the party handling it should accept it or
24/// reject it. If it is dropped without being explicitly handled, a reject
25/// message will be sent anyway.
26#[derive(Debug)]
27pub struct IncomingStream {
28/// The runtime's time provider.
29time_provider: DynTimeProvider,
30/// The message that the client sent us to begin the stream.
31request: IncomingStreamRequest,
32/// Stream components used to assemble the [`DataStream`].
33components: StreamComponents,
34}
3536impl IncomingStream {
37/// Create a new `IncomingStream`.
38pub(crate) fn new(
39 time_provider: DynTimeProvider,
40 request: IncomingStreamRequest,
41 components: StreamComponents,
42 ) -> Self {
43Self {
44 time_provider,
45 request,
46 components,
47 }
48 }
4950/// Return the underlying message that was used to try to begin this stream.
51pub fn request(&self) -> &IncomingStreamRequest {
52&self.request
53 }
5455/// Accept this stream as a new [`DataStream`], and send the client a
56 /// message letting them know the stream was accepted.
57pub async fn accept_data(self, message: msg::Connected) -> Result<DataStream> {
58let Self {
59 time_provider,
60 request,
61 components:
62 StreamComponents {
63mut target,
64 stream_receiver,
65 xon_xoff_reader_ctrl,
66 memquota,
67 },
68 } = self;
6970match request {
71 IncomingStreamRequest::Begin(_) | IncomingStreamRequest::BeginDir(_) => {
72 target.send(message.into()).await?;
73Ok(DataStream::new_connected(
74 time_provider,
75 stream_receiver,
76 xon_xoff_reader_ctrl,
77 target,
78 memquota,
79 ))
80 }
81 IncomingStreamRequest::Resolve(_) => {
82Err(internal!("Cannot accept data on a RESOLVE stream").into())
83 }
84 }
85 }
8687/// Reject this request and send an error message to the client.
88pub async fn reject(mut self, message: msg::End) -> Result<()> {
89let rx = self.reject_inner(CloseStreamBehavior::SendEnd(message))?;
9091 rx.await.map_err(|_| Error::CircuitClosed)?.map(|_| ())
92 }
9394/// Reject this request and possibly send an error message to the client.
95 ///
96 /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
97fn reject_inner(
98&mut self,
99 message: CloseStreamBehavior,
100 ) -> Result<oneshot::Receiver<Result<()>>> {
101self.components.target.close_pending(message)
102 }
103104/// Ignore this request without replying to the client.
105 ///
106 /// (If you drop an [`IncomingStream`] without calling `accept_data`,
107 /// `reject`, or this method, the drop handler will cause it to be
108 /// rejected.)
109pub async fn discard(mut self) -> Result<()> {
110let rx = self.reject_inner(CloseStreamBehavior::SendNothing)?;
111112 rx.await.map_err(|_| Error::CircuitClosed)?.map(|_| ())
113 }
114}
115116// NOTE: We do not need to `impl Drop for IncomingStream { .. }`: when its
117// StreamTarget is dropped, this will drop its internal mpsc::Sender, and the
118// circuit reactor will see a close on its mpsc::Receiver, and the circuit
119// reactor will itself send an End.
120121restricted_msg! {
122/// The allowed incoming messages on an `IncomingStream`.
123#[derive(Clone, Debug, Deftly)]
124 #[derive_deftly(HasMemoryCost)]
125 #[non_exhaustive]
126pub enum IncomingStreamRequest: RelayMsg {
127/// A BEGIN message.
128Begin,
129/// A BEGIN_DIR message.
130BeginDir,
131/// A RESOLVE message.
132Resolve,
133 }
134}
135136/// Bit-vector used to represent a list of permitted commands.
137///
138/// This is cheaper and faster than using a vec, and avoids side-channel
139/// attacks.
140type RelayCmdSet = bitvec::BitArr!(for 256);
141142/// A `CmdChecker` that enforces correctness for incoming commands on unrecognized streams that
143/// have a non-zero stream ID.
144#[derive(Debug)]
145pub(crate) struct IncomingCmdChecker {
146/// The "begin" commands that can be received on this type of circuit:
147 ///
148 /// * onion service circuits only accept `BEGIN`
149 /// * all relay circuits accept `BEGIN_DIR`
150 /// * exit relays additionally accept `BEGIN` or `RESOLVE` on relay circuits
151 /// * once CONNECT_UDP is implemented, relays and later onion services may accept CONNECT_UDP
152 /// as well
153allow_commands: RelayCmdSet,
154}
155156impl IncomingCmdChecker {
157/// Create a new boxed `IncomingCmdChecker`.
158pub(crate) fn new_any(allow_commands: &[RelayCmd]) -> AnyCmdChecker {
159let mut array = BitArray::ZERO;
160for c in allow_commands {
161 array.set(u8::from(*c) as usize, true);
162 }
163 Box::new(Self {
164 allow_commands: array,
165 })
166 }
167}
168169impl super::CmdChecker for IncomingCmdChecker {
170fn check_msg(&mut self, msg: &UnparsedRelayMsg) -> Result<StreamStatus> {
171if self.allow_commands[u8::from(msg.cmd()) as usize] {
172Ok(StreamStatus::Open)
173 } else {
174Err(Error::StreamProto(format!(
175"Unexpected {} on incoming stream",
176 msg.cmd()
177 )))
178 }
179 }
180181fn consume_checked_msg(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
182let _ = msg
183 .decode::<IncomingStreamRequest>()
184 .map_err(|err| Error::from_bytes_err(err, "invalid message on incoming stream"))?;
185186Ok(())
187 }
188}
189190/// A callback that can check whether a given stream request is acceptable
191/// immediately on its receipt.
192///
193/// This should only be used for checks that need to be done immediately, with a
194/// view of the state of the circuit. Any other checks should, if possible, be
195/// done on the [`IncomingStream`] objects as they are received.
196pub trait IncomingStreamRequestFilter: Send + 'static {
197/// Check an incoming stream request, and decide what to do with it.
198 ///
199 /// Implementations of this function should
200fn disposition(
201&mut self,
202 ctx: &IncomingStreamRequestContext<'_>,
203 circ: &ClientCircSyncView<'_>,
204 ) -> Result<IncomingStreamRequestDisposition>;
205}
206207/// What action to take with an incoming stream request.
208#[derive(Clone, Debug)]
209#[non_exhaustive]
210pub enum IncomingStreamRequestDisposition {
211/// Accept the request (for now) and pass it to the mpsc::Receiver
212 /// that is yielding them as [`IncomingStream``
213Accept,
214/// Rejected the request, and close the circuit on which it was received.
215CloseCircuit,
216/// Reject the request and send an END message.
217RejectRequest(msg::End),
218}
219220/// Information about a stream request, as passed to an [`IncomingStreamRequestFilter`].
221pub struct IncomingStreamRequestContext<'a> {
222/// The request message itself
223pub(crate) request: &'a IncomingStreamRequest,
224}
225226impl<'a> IncomingStreamRequestContext<'a> {
227/// Return a reference to the message used to request this stream.
228pub fn request(&self) -> &'a IncomingStreamRequest {
229self.request
230 }
231}
232233#[cfg(test)]
234mod test {
235// @@ begin test lint list maintained by maint/add_warning @@
236#![allow(clippy::bool_assert_comparison)]
237 #![allow(clippy::clone_on_copy)]
238 #![allow(clippy::dbg_macro)]
239 #![allow(clippy::mixed_attributes_style)]
240 #![allow(clippy::print_stderr)]
241 #![allow(clippy::print_stdout)]
242 #![allow(clippy::single_char_pattern)]
243 #![allow(clippy::unwrap_used)]
244 #![allow(clippy::unchecked_duration_subtraction)]
245 #![allow(clippy::useless_vec)]
246 #![allow(clippy::needless_pass_by_value)]
247//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
248249use tor_cell::relaycell::{
250 msg::{Begin, BeginDir, Data, Resolve},
251 AnyRelayMsgOuter, RelayCellFormat,
252 };
253254use super::*;
255256#[test]
257fn incoming_cmd_checker() {
258// Convert an AnyRelayMsg to an UnparsedRelayCell.
259let u = |msg| {
260let body = AnyRelayMsgOuter::new(None, msg)
261 .encode(RelayCellFormat::V0, &mut rand::rng())
262 .unwrap();
263 UnparsedRelayMsg::from_singleton_body(RelayCellFormat::V0, body).unwrap()
264 };
265let begin = u(Begin::new("allium.example.com", 443, 0).unwrap().into());
266let begin_dir = u(BeginDir::default().into());
267let resolve = u(Resolve::new("allium.example.com").into());
268let data = u(Data::new(&[1, 2, 3]).unwrap().into());
269270 {
271let mut cc_none = IncomingCmdChecker::new_any(&[]);
272for m in [&begin, &begin_dir, &resolve, &data] {
273assert!(cc_none.check_msg(m).is_err());
274 }
275 }
276277 {
278let mut cc_begin = IncomingCmdChecker::new_any(&[RelayCmd::BEGIN]);
279assert_eq!(cc_begin.check_msg(&begin).unwrap(), StreamStatus::Open);
280for m in [&begin_dir, &resolve, &data] {
281assert!(cc_begin.check_msg(m).is_err());
282 }
283 }
284285 {
286let mut cc_any = IncomingCmdChecker::new_any(&[
287 RelayCmd::BEGIN,
288 RelayCmd::BEGIN_DIR,
289 RelayCmd::RESOLVE,
290 ]);
291for m in [&begin, &begin_dir, &resolve] {
292assert_eq!(cc_any.check_msg(m).unwrap(), StreamStatus::Open);
293 }
294assert!(cc_any.check_msg(&data).is_err());
295 }
296 }
297}