tor_proto/tunnel/
circuit.rs

1//! Multi-hop paths over the Tor network.
2//!
3//! Right now, we only implement "client circuits" -- also sometimes
4//! called "origin circuits".  A client circuit is one that is
5//! constructed by this Tor instance, and used in its own behalf to
6//! send data over the Tor network.
7//!
8//! Each circuit has multiple hops over the Tor network: each hop
9//! knows only the hop before and the hop after.  The client shares a
10//! separate set of keys with each hop.
11//!
12//! To build a circuit, first create a [crate::channel::Channel], then
13//! call its [crate::channel::Channel::new_circ] method.  This yields
14//! a [PendingClientCirc] object that won't become live until you call
15//! one of the methods
16//! (typically [`PendingClientCirc::create_firsthop`])
17//! that extends it to its first hop.  After you've
18//! done that, you can call [`ClientCirc::extend`] on the circuit to
19//! build it into a multi-hop circuit.  Finally, you can use
20//! [ClientCirc::begin_stream] to get a Stream object that can be used
21//! for anonymized data.
22//!
23//! # Implementation
24//!
25//! Each open circuit has a corresponding Reactor object that runs in
26//! an asynchronous task, and manages incoming cells from the
27//! circuit's upstream channel.  These cells are either RELAY cells or
28//! DESTROY cells.  DESTROY cells are handled immediately.
29//! RELAY cells are either for a particular stream, in which case they
30//! get forwarded to a RawCellStream object, or for no particular stream,
31//! in which case they are considered "meta" cells (like EXTENDED2)
32//! that should only get accepted if something is waiting for them.
33//!
34//! # Limitations
35//!
36//! This is client-only.
37
38pub(crate) mod celltypes;
39pub(crate) mod halfcirc;
40
41#[cfg(feature = "hs-common")]
42pub mod handshake;
43#[cfg(not(feature = "hs-common"))]
44pub(crate) mod handshake;
45
46pub(super) mod path;
47pub(crate) mod unique_id;
48
49use crate::channel::Channel;
50use crate::congestion::params::CongestionControlParams;
51use crate::crypto::cell::HopNum;
52use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
53use crate::memquota::{CircuitAccount, SpecificAccount as _};
54use crate::stream::{
55    AnyCmdChecker, DataCmdChecker, DataStream, ResolveCmdChecker, ResolveStream, StreamParameters,
56    StreamReader,
57};
58use crate::tunnel::circuit::celltypes::*;
59use crate::tunnel::reactor::CtrlCmd;
60use crate::tunnel::reactor::{
61    CircuitHandshake, CtrlMsg, Reactor, RECV_WINDOW_INIT, STREAM_READER_BUFFER,
62};
63use crate::tunnel::{LegId, StreamTarget, TargetHop};
64use crate::util::skew::ClockSkew;
65use crate::{Error, ResolveError, Result};
66use educe::Educe;
67use path::HopDetail;
68use tor_cell::{
69    chancell::CircId,
70    relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal},
71};
72
73use tor_error::{bad_api_usage, internal, into_internal};
74use tor_linkspec::{CircTarget, LinkSpecType, OwnedChanTarget, RelayIdType};
75use tor_protover::named;
76
77pub use crate::crypto::binding::CircuitBinding;
78pub use crate::memquota::StreamAccount;
79pub use crate::tunnel::circuit::unique_id::UniqId;
80
81#[cfg(feature = "hs-service")]
82use {
83    crate::stream::{IncomingCmdChecker, IncomingStream},
84    crate::tunnel::reactor::StreamReqInfo,
85};
86
87use futures::channel::mpsc;
88use oneshot_fused_workaround as oneshot;
89
90use crate::congestion::sendme::StreamRecvWindow;
91use crate::DynTimeProvider;
92use futures::FutureExt as _;
93use std::collections::HashMap;
94use std::net::IpAddr;
95use std::sync::{Arc, Mutex};
96use tor_memquota::mq_queue::{self, ChannelSpec as _, MpscSpec};
97
98use crate::crypto::handshake::ntor::NtorPublicKey;
99
100pub use path::{Path, PathEntry};
101
102/// The size of the buffer for communication between `ClientCirc` and its reactor.
103pub const CIRCUIT_BUFFER_SIZE: usize = 128;
104
105#[cfg(feature = "send-control-msg")]
106use {crate::tunnel::msghandler::UserMsgHandler, crate::tunnel::reactor::MetaCellHandler};
107
108pub use crate::tunnel::reactor::syncview::ClientCircSyncView;
109#[cfg(feature = "send-control-msg")]
110#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
111pub use {crate::tunnel::msghandler::MsgHandler, crate::tunnel::reactor::MetaCellDisposition};
112
113/// MPSC queue relating to a stream (either inbound or outbound), sender
114pub(crate) type StreamMpscSender<T> = mq_queue::Sender<T, MpscSpec>;
115/// MPSC queue relating to a stream (either inbound or outbound), receiver
116pub(crate) type StreamMpscReceiver<T> = mq_queue::Receiver<T, MpscSpec>;
117
118/// MPSC queue for inbound data on its way from channel to circuit, sender
119pub(crate) type CircuitRxSender = mq_queue::Sender<ClientCircChanMsg, MpscSpec>;
120/// MPSC queue for inbound data on its way from channel to circuit, receiver
121pub(crate) type CircuitRxReceiver = mq_queue::Receiver<ClientCircChanMsg, MpscSpec>;
122
123#[derive(Debug)]
124/// A circuit that we have constructed over the Tor network.
125///
126/// # Circuit life cycle
127///
128/// `ClientCirc`s are created in an initially unusable state using [`Channel::new_circ`],
129/// which returns a [`PendingClientCirc`].  To get a real (one-hop) circuit from
130/// one of these, you invoke one of its `create_firsthop` methods (typically
131/// [`create_firsthop_fast()`](PendingClientCirc::create_firsthop_fast) or
132/// [`create_firsthop()`](PendingClientCirc::create_firsthop)).
133/// Then, to add more hops to the circuit, you can call
134/// [`extend()`](ClientCirc::extend) on it.
135///
136/// For higher-level APIs, see the `tor-circmgr` crate: the ones here in
137/// `tor-proto` are probably not what you need.
138///
139/// After a circuit is created, it will persist until it is closed in one of
140/// five ways:
141///    1. A remote error occurs.
142///    2. Some hop on the circuit sends a `DESTROY` message to tear down the
143///       circuit.
144///    3. The circuit's channel is closed.
145///    4. Someone calls [`ClientCirc::terminate`] on the circuit.
146///    5. The last reference to the `ClientCirc` is dropped. (Note that every stream
147///       on a `ClientCirc` keeps a reference to it, which will in turn keep the
148///       circuit from closing until all those streams have gone away.)
149///
150/// Note that in cases 1-4 the [`ClientCirc`] object itself will still exist: it
151/// will just be unusable for most purposes.  Most operations on it will fail
152/// with an error.
153//
154// Effectively, this struct contains two Arcs: one for `path` and one for
155// `control` (which surely has something Arc-like in it).  We cannot unify
156// these by putting a single Arc around the whole struct, and passing
157// an Arc strong reference to the `Reactor`, because then `control` would
158// not be dropped when the last user of the circuit goes away.  We could
159// make the reactor have a weak reference but weak references are more
160// expensive to dereference.
161//
162// Because of the above, cloning this struct is always going to involve
163// two atomic refcount changes/checks.  Wrapping it in another Arc would
164// be overkill.
165//
166pub struct ClientCirc {
167    /// Mutable state shared with the `Reactor`.
168    mutable: Arc<TunnelMutableState>,
169    /// A unique identifier for this circuit.
170    unique_id: UniqId,
171    /// Channel to send control messages to the reactor.
172    pub(super) control: mpsc::UnboundedSender<CtrlMsg>,
173    /// Channel to send commands to the reactor.
174    pub(super) command: mpsc::UnboundedSender<CtrlCmd>,
175    /// A future that resolves to Cancelled once the reactor is shut down,
176    /// meaning that the circuit is closed.
177    #[cfg_attr(not(feature = "experimental-api"), allow(dead_code))]
178    reactor_closed_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
179    /// For testing purposes: the CircId, for use in peek_circid().
180    #[cfg(test)]
181    circid: CircId,
182    /// Memory quota account
183    memquota: CircuitAccount,
184    /// Time provider
185    time_provider: DynTimeProvider,
186}
187
188/// The mutable state of a tunnel, shared between [`ClientCirc`] and [`Reactor`].
189///
190/// NOTE(gabi): this mutex-inside-a-mutex might look suspicious,
191/// but it is currently the best option we have for sharing
192/// the circuit state with `ClientCirc` (and soon, with `ClientTunnel`).
193/// In practice, these mutexes won't be accessed very often
194/// (they're accessed for writing when a circuit is extended,
195/// and for reading by the various `ClientCirc` APIs),
196/// so they shouldn't really impact performance.
197///
198/// Alternatively, the circuit state information could be shared
199/// outside the reactor through a channel (passed to the reactor via a `CtrlCmd`),
200/// but in #1840 @opara notes that involves making the `ClientCirc` accessors
201/// (`ClientCirc::path`, `ClientCirc::binding_key`, etc.)
202/// asynchronous, which will significantly complicate their callsites,
203/// which would in turn need to be made async too.
204///
205/// We should revisit this decision at some point, and decide whether an async API
206/// would be preferable.
207#[derive(Debug, Default)]
208pub(super) struct TunnelMutableState(Mutex<HashMap<UniqId, (LegId, Arc<MutableState>)>>);
209
210impl TunnelMutableState {
211    /// Add the [`MutableState`] of a circuit.
212    pub(super) fn insert(&self, unique_id: UniqId, leg: LegId, mutable: Arc<MutableState>) {
213        #[allow(unused)] // unused in non-debug builds
214        let state = self
215            .0
216            .lock()
217            .expect("lock poisoned")
218            .insert(unique_id, (leg, mutable));
219
220        debug_assert!(state.is_none());
221    }
222
223    /// Remove the [`MutableState`] of a circuit.
224    pub(super) fn remove(&self, unique_id: UniqId) {
225        #[allow(unused)] // unused in non-debug builds
226        let state = self.0.lock().expect("lock poisoned").remove(&unique_id);
227
228        debug_assert!(state.is_some());
229    }
230
231    /// Return a [`Path`] object describing all the hops in the specified circuit.
232    ///
233    /// See [`MutableState::path`].
234    fn path_ref(&self, unique_id: UniqId) -> Result<Arc<Path>> {
235        let lock = self.0.lock().expect("lock poisoned");
236        let (_leg, mutable) = lock
237            .get(&unique_id)
238            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
239
240        Ok(mutable.path())
241    }
242
243    /// Return a description of the first hop of this circuit.
244    ///
245    /// Returns an error if a circuit with the specified [`UniqId`] doesn't exist.
246    /// Returns `Ok(None)` if the specified circuit doesn't have any hops.
247    fn first_hop(&self, unique_id: UniqId) -> Result<Option<OwnedChanTarget>> {
248        let lock = self.0.lock().expect("lock poisoned");
249        let (_leg, mutable) = lock
250            .get(&unique_id)
251            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
252
253        let first_hop = mutable.first_hop().map(|first_hop| match first_hop {
254            path::HopDetail::Relay(r) => r,
255            #[cfg(feature = "hs-common")]
256            path::HopDetail::Virtual => {
257                panic!("somehow made a circuit with a virtual first hop.")
258            }
259        });
260
261        Ok(first_hop)
262    }
263
264    /// Return the [`HopNum`] of the last hop of the specified circuit.
265    ///
266    /// Returns an error if a circuit with the specified [`UniqId`] doesn't exist.
267    ///
268    /// See [`MutableState::last_hop_num`].
269    fn last_hop_num(&self, unique_id: UniqId) -> Result<Option<HopNum>> {
270        let lock = self.0.lock().expect("lock poisoned");
271        let (_leg, mutable) = lock
272            .get(&unique_id)
273            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
274
275        Ok(mutable.last_hop_num())
276    }
277
278    /// Return the number of hops in the specified circuit.
279    ///
280    /// See [`MutableState::n_hops`].
281    fn n_hops(&self, unique_id: UniqId) -> Result<usize> {
282        let lock = self.0.lock().expect("lock poisoned");
283        let (_leg, mutable) = lock
284            .get(&unique_id)
285            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
286
287        Ok(mutable.n_hops())
288    }
289
290    /// Return the cryptographic material used to prove knowledge of a shared
291    /// secret with with `hop` on the circuit with the specified `unique_id`.
292    fn binding_key(&self, unique_id: UniqId, hop: HopNum) -> Result<Option<CircuitBinding>> {
293        let lock = self.0.lock().expect("lock poisoned");
294        let (_leg, mutable) = lock
295            .get(&unique_id)
296            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
297
298        Ok(mutable.binding_key(hop))
299    }
300}
301
302/// The mutable state of a circuit.
303#[derive(Educe, Default)]
304#[educe(Debug)]
305pub(super) struct MutableState(Mutex<CircuitState>);
306
307impl MutableState {
308    /// Add a hop to the path of this circuit.
309    pub(super) fn add_hop(&self, peer_id: HopDetail, binding: Option<CircuitBinding>) {
310        let mut mutable = self.0.lock().expect("poisoned lock");
311        Arc::make_mut(&mut mutable.path).push_hop(peer_id);
312        mutable.binding.push(binding);
313    }
314
315    /// Get a copy of the circuit's current [`path::Path`].
316    pub(super) fn path(&self) -> Arc<path::Path> {
317        let mutable = self.0.lock().expect("poisoned lock");
318        Arc::clone(&mutable.path)
319    }
320
321    /// Return the cryptographic material used to prove knowledge of a shared
322    /// secret with with `hop`.
323    pub(super) fn binding_key(&self, hop: HopNum) -> Option<CircuitBinding> {
324        let mutable = self.0.lock().expect("poisoned lock");
325
326        mutable.binding.get::<usize>(hop.into()).cloned().flatten()
327        // NOTE: I'm not thrilled to have to copy this information, but we use
328        // it very rarely, so it's not _that_ bad IMO.
329    }
330
331    /// Return a description of the first hop of this circuit.
332    fn first_hop(&self) -> Option<HopDetail> {
333        let mutable = self.0.lock().expect("poisoned lock");
334        mutable.path.first_hop()
335    }
336
337    /// Return the [`HopNum`] of the last hop of this circuit.
338    ///
339    /// NOTE: This function will return the [`HopNum`] of the hop
340    /// that is _currently_ the last. If there is an extend operation in progress,
341    /// the currently pending hop may or may not be counted, depending on whether
342    /// the extend operation finishes before this call is done.
343    fn last_hop_num(&self) -> Option<HopNum> {
344        let mutable = self.0.lock().expect("poisoned lock");
345        mutable.path.last_hop_num()
346    }
347
348    /// Return the number of hops in this circuit.
349    ///
350    /// NOTE: This function will currently return only the number of hops
351    /// _currently_ in the circuit. If there is an extend operation in progress,
352    /// the currently pending hop may or may not be counted, depending on whether
353    /// the extend operation finishes before this call is done.
354    fn n_hops(&self) -> usize {
355        let mutable = self.0.lock().expect("poisoned lock");
356        mutable.path.n_hops()
357    }
358}
359
360/// The shared state of a circuit.
361#[derive(Educe, Default)]
362#[educe(Debug)]
363pub(super) struct CircuitState {
364    /// Information about this circuit's path.
365    ///
366    /// This is stored in an Arc so that we can cheaply give a copy of it to
367    /// client code; when we need to add a hop (which is less frequent) we use
368    /// [`Arc::make_mut()`].
369    path: Arc<path::Path>,
370
371    /// Circuit binding keys [q.v.][`CircuitBinding`] information for each hop
372    /// in the circuit's path.
373    ///
374    /// NOTE: Right now, there is a `CircuitBinding` for every hop.  There's a
375    /// fair chance that this will change in the future, and I don't want other
376    /// code to assume that a `CircuitBinding` _must_ exist, so I'm making this
377    /// an `Option`.
378    #[educe(Debug(ignore))]
379    binding: Vec<Option<CircuitBinding>>,
380}
381
382/// A ClientCirc that needs to send a create cell and receive a created* cell.
383///
384/// To use one of these, call `create_firsthop_fast()` or `create_firsthop()`
385/// to negotiate the cryptographic handshake with the first hop.
386pub struct PendingClientCirc {
387    /// A oneshot receiver on which we'll receive a CREATED* cell,
388    /// or a DESTROY cell.
389    recvcreated: oneshot::Receiver<CreateResponse>,
390    /// The ClientCirc object that we can expose on success.
391    circ: Arc<ClientCirc>,
392}
393
394/// Description of the network's current rules for building circuits.
395#[non_exhaustive]
396#[derive(Clone, Debug)]
397pub struct CircParameters {
398    /// Whether we should include ed25519 identities when we send
399    /// EXTEND2 cells.
400    pub extend_by_ed25519_id: bool,
401    /// Congestion control parameters for this circuit.
402    pub ccontrol: CongestionControlParams,
403}
404
405#[cfg(test)]
406impl std::default::Default for CircParameters {
407    fn default() -> Self {
408        Self {
409            extend_by_ed25519_id: true,
410            ccontrol: crate::congestion::test_utils::params::build_cc_fixed_params(),
411        }
412    }
413}
414
415impl CircParameters {
416    /// Constructor
417    pub fn new(extend_by_ed25519_id: bool, ccontrol: CongestionControlParams) -> Self {
418        Self {
419            extend_by_ed25519_id,
420            ccontrol,
421        }
422    }
423}
424
425impl ClientCirc {
426    /// Return a description of the first hop of this circuit.
427    ///
428    /// # Panics
429    ///
430    /// Panics if there is no first hop.  (This should be impossible outside of
431    /// the tor-proto crate, but within the crate it's possible to have a
432    /// circuit with no hops.)
433    pub fn first_hop(&self) -> Result<OwnedChanTarget> {
434        Ok(self
435            .mutable
436            .first_hop(self.unique_id)
437            .map_err(|_| Error::CircuitClosed)?
438            .expect("called first_hop on an un-constructed circuit"))
439    }
440
441    /// Return the [`HopNum`] of the last hop of this circuit.
442    ///
443    /// Returns an error if there is no last hop.  (This should be impossible outside of the
444    /// tor-proto crate, but within the crate it's possible to have a circuit with no hops.)
445    ///
446    /// NOTE: This function will return the [`HopNum`] of the hop
447    /// that is _currently_ the last. If there is an extend operation in progress,
448    /// the currently pending hop may or may not be counted, depending on whether
449    /// the extend operation finishes before this call is done.
450    pub fn last_hop_num(&self) -> Result<HopNum> {
451        Ok(self
452            .mutable
453            .last_hop_num(self.unique_id)?
454            .ok_or_else(|| internal!("no last hop index"))?)
455    }
456
457    /// Return a [`Path`] object describing all the hops in this circuit.
458    ///
459    /// Note that this `Path` is not automatically updated if the circuit is
460    /// extended.
461    pub fn path_ref(&self) -> Result<Arc<Path>> {
462        self.mutable
463            .path_ref(self.unique_id)
464            .map_err(|_| Error::CircuitClosed)
465    }
466
467    /// Get the clock skew claimed by the first hop of the circuit.
468    ///
469    /// See [`Channel::clock_skew()`].
470    pub async fn first_hop_clock_skew(&self) -> Result<ClockSkew> {
471        let (tx, rx) = oneshot::channel();
472
473        self.control
474            .unbounded_send(CtrlMsg::FirstHopClockSkew { answer: tx })
475            .map_err(|_| Error::CircuitClosed)?;
476
477        Ok(rx.await.map_err(|_| Error::CircuitClosed)??)
478    }
479
480    /// Return a reference to this circuit's memory quota account
481    pub fn mq_account(&self) -> &CircuitAccount {
482        &self.memquota
483    }
484
485    /// Return the cryptographic material used to prove knowledge of a shared
486    /// secret with with `hop`.
487    ///
488    /// See [`CircuitBinding`] for more information on how this is used.
489    ///
490    /// Return None if we have no circuit binding information for the hop, or if
491    /// the hop does not exist.
492    pub fn binding_key(&self, hop: HopNum) -> Result<Option<CircuitBinding>> {
493        self.mutable
494            .binding_key(self.unique_id, hop)
495            .map_err(|_| Error::CircuitClosed)
496    }
497
498    /// Start an ad-hoc protocol exchange to the specified hop on this circuit
499    ///
500    /// To use this:
501    ///
502    ///  0. Create an inter-task channel you'll use to receive
503    ///     the outcome of your conversation,
504    ///     and bundle it into a [`MsgHandler`].
505    ///
506    ///  1. Call `start_conversation`.
507    ///     This will install a your handler, for incoming messages,
508    ///     and send the outgoing message (if you provided one).
509    ///     After that, each message on the circuit
510    ///     that isn't handled by the core machinery
511    ///     is passed to your provided `reply_handler`.
512    ///
513    ///  2. Possibly call `send_msg` on the [`Conversation`],
514    ///     from the call site of `start_conversation`,
515    ///     possibly multiple times, from time to time,
516    ///     to send further desired messages to the peer.
517    ///
518    ///  3. In your [`MsgHandler`], process the incoming messages.
519    ///     You may respond by
520    ///     sending additional messages
521    ///     When the protocol exchange is finished,
522    ///     `MsgHandler::handle_msg` should return
523    ///     [`ConversationFinished`](MetaCellDisposition::ConversationFinished).
524    ///
525    /// If you don't need the `Conversation` to send followup messages,
526    /// you may simply drop it,
527    /// and rely on the responses you get from your handler,
528    /// on the channel from step 0 above.
529    /// Your handler will remain installed and able to process incoming messages
530    /// until it returns `ConversationFinished`.
531    ///
532    /// (If you don't want to accept any replies at all, it may be
533    /// simpler to use [`ClientCirc::send_raw_msg`].)
534    ///
535    /// Note that it is quite possible to use this function to violate the tor
536    /// protocol; most users of this API will not need to call it.  It is used
537    /// to implement most of the onion service handshake.
538    ///
539    /// # Limitations
540    ///
541    /// Only one conversation may be active at any one time,
542    /// for any one circuit.
543    /// This generally means that this function should not be called
544    /// on a circuit which might be shared with anyone else.
545    ///
546    /// Likewise, it is forbidden to try to extend the circuit,
547    /// while the conversation is in progress.
548    ///
549    /// After the conversation has finished, the circuit may be extended.
550    /// Or, `start_conversation` may be called again;
551    /// but, in that case there will be a gap between the two conversations,
552    /// during which no `MsgHandler` is installed,
553    /// and unexpected incoming messages would close the circuit.
554    ///
555    /// If these restrictions are violated, the circuit will be closed with an error.
556    ///
557    /// ## Precise definition of the lifetime of a conversation
558    ///
559    /// A conversation is in progress from entry to `start_conversation`,
560    /// until entry to the body of the [`MsgHandler::handle_msg`]
561    /// call which returns [`ConversationFinished`](MetaCellDisposition::ConversationFinished).
562    /// (*Entry* since `handle_msg` is synchronously embedded
563    /// into the incoming message processing.)
564    /// So you may start a new conversation as soon as you have the final response
565    /// via your inter-task channel from (0) above.
566    ///
567    /// The lifetime relationship of the [`Conversation`],
568    /// vs the handler returning `ConversationFinished`
569    /// is not enforced by the type system.
570    // Doing so without still leaving plenty of scope for runtime errors doesn't seem possible,
571    // at least while allowing sending followup messages from outside the handler.
572    //
573    // TODO hs: it might be nice to avoid exposing tor-cell APIs in the
574    //   tor-proto interface.
575    #[cfg(feature = "send-control-msg")]
576    pub async fn start_conversation(
577        &self,
578        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
579        reply_handler: impl MsgHandler + Send + 'static,
580        hop_num: HopNum,
581    ) -> Result<Conversation<'_>> {
582        let handler = Box::new(UserMsgHandler::new(hop_num, reply_handler));
583        let conversation = Conversation(self);
584        conversation.send_internal(msg, Some(handler)).await?;
585        Ok(conversation)
586    }
587
588    /// Send an ad-hoc message to a given hop on the circuit, without expecting
589    /// a reply.
590    ///
591    /// (If you want to handle one or more possible replies, see
592    /// [`ClientCirc::start_conversation`].)
593    #[cfg(feature = "send-control-msg")]
594    pub async fn send_raw_msg(
595        &self,
596        msg: tor_cell::relaycell::msg::AnyRelayMsg,
597        hop_num: HopNum,
598    ) -> Result<()> {
599        let (sender, receiver) = oneshot::channel();
600        let ctrl_msg = CtrlMsg::SendMsg {
601            hop_num,
602            msg,
603            sender,
604        };
605        self.control
606            .unbounded_send(ctrl_msg)
607            .map_err(|_| Error::CircuitClosed)?;
608
609        receiver.await.map_err(|_| Error::CircuitClosed)?
610    }
611
612    /// Tell this circuit to begin allowing the final hop of the circuit to try
613    /// to create new Tor streams, and to return those pending requests in an
614    /// asynchronous stream.
615    ///
616    /// Ordinarily, these requests are rejected.
617    ///
618    /// There can only be one [`Stream`](futures::Stream) of this type created on a given circuit.
619    /// If a such a [`Stream`](futures::Stream) already exists, this method will return
620    /// an error.
621    ///
622    /// After this method has been called on a circuit, the circuit is expected
623    /// to receive requests of this type indefinitely, until it is finally closed.
624    /// If the `Stream` is dropped, the next request on this circuit will cause it to close.
625    ///
626    /// Only onion services (and eventually) exit relays should call this
627    /// method.
628    //
629    // TODO: Someday, we might want to allow a stream request handler to be
630    // un-registered.  However, nothing in the Tor protocol requires it.
631    //
632    // TODO(conflux): when conflux is ready, we need to update these docs to say
633    // that on a multipath circuit, this function **must** be called on the "main"
634    // (initial) circuit into which all of the other circuit legs are linked,
635    // or on the resulting ClientTunnel itself.
636    //
637    // Any incoming request handlers installed on the other circuits
638    // (which are are shutdown using CtrlCmd::ShutdownAndReturnCircuit)
639    // will be discarded (along with the reactor of that circuit)
640    #[cfg(feature = "hs-service")]
641    pub async fn allow_stream_requests(
642        self: &Arc<ClientCirc>,
643        allow_commands: &[tor_cell::relaycell::RelayCmd],
644        hop_num: HopNum,
645        filter: impl crate::stream::IncomingStreamRequestFilter,
646    ) -> Result<impl futures::Stream<Item = IncomingStream>> {
647        use futures::stream::StreamExt;
648
649        use crate::tunnel::HopLocation;
650
651        /// The size of the channel receiving IncomingStreamRequestContexts.
652        const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
653
654        let time_prov = self.time_provider.clone();
655        let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
656        let (incoming_sender, incoming_receiver) =
657            MpscSpec::new(INCOMING_BUFFER).new_mq(time_prov, self.memquota.as_raw_account())?;
658        let (tx, rx) = oneshot::channel();
659
660        self.command
661            .unbounded_send(CtrlCmd::AwaitStreamRequest {
662                cmd_checker,
663                incoming_sender,
664                hop_num,
665                done: tx,
666                filter: Box::new(filter),
667            })
668            .map_err(|_| Error::CircuitClosed)?;
669
670        // Check whether the AwaitStreamRequest was processed successfully.
671        rx.await.map_err(|_| Error::CircuitClosed)??;
672
673        // TODO(conflux): maybe this function should take a HopLocation instead of a HopNum,
674        // but we currently cannot resolve `HopLocation`s outside of the reactor
675        // (and we need the resolved HopNum to assert the stream request indeed came from the right hop below).
676        let allowed_hop_num = hop_num;
677
678        let circ = Arc::clone(self);
679        Ok(incoming_receiver.map(move |req_ctx| {
680            let StreamReqInfo {
681                req,
682                stream_id,
683                hop_num,
684                leg,
685                receiver,
686                msg_tx,
687                memquota,
688                relay_cell_format,
689            } = req_ctx;
690
691            // We already enforce this in handle_incoming_stream_request; this
692            // assertion is just here to make sure that we don't ever
693            // accidentally remove or fail to enforce that check, since it is
694            // security-critical.
695            assert_eq!(allowed_hop_num, hop_num);
696
697            // TODO(conflux): figure out what this is going to look like
698            // for onion services (perhaps we should forbid this function
699            // from being called on a multipath circuit?)
700            //
701            // See also:
702            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3002#note_3200937
703            let target = StreamTarget {
704                circ: Arc::clone(&circ),
705                tx: msg_tx,
706                hop: HopLocation::Hop((leg, hop_num)),
707                stream_id,
708                relay_cell_format,
709            };
710
711            let reader = StreamReader {
712                target: target.clone(),
713                receiver,
714                recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
715                ended: false,
716            };
717
718            IncomingStream::new(req, target, reader, memquota)
719        }))
720    }
721
722    /// Extend the circuit, via the most appropriate circuit extension handshake,
723    /// to the chosen `target` hop.
724    pub async fn extend<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
725    where
726        Tg: CircTarget,
727    {
728        // For now we use the simplest decision-making mechanism:
729        // we use ntor_v3 whenever it is present; and otherwise we use ntor.
730        //
731        // This behavior is slightly different from C tor, which uses ntor v3
732        // only whenever it want to send any extension in the circuit message.
733        // But thanks to congestion control (named::FLOWCTRL_CC), we'll _always_
734        // want to use an extension if we can, and so it doesn't make too much
735        // sense to detect the case where we have no extensions.
736        //
737        // (As of April 2025, RELAY_NTORV3 is not yet listed as Required for relays
738        // on the tor network, and so we cannot simply assume that everybody has it.)
739        if target
740            .protovers()
741            .supports_named_subver(named::RELAY_NTORV3)
742        {
743            self.extend_ntor_v3(target, params).await
744        } else {
745            self.extend_ntor(target, params).await
746        }
747    }
748
749    /// Extend the circuit via the ntor handshake to a new target last
750    /// hop.
751    pub async fn extend_ntor<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
752    where
753        Tg: CircTarget,
754    {
755        let key = NtorPublicKey {
756            id: *target
757                .rsa_identity()
758                .ok_or(Error::MissingId(RelayIdType::Rsa))?,
759            pk: *target.ntor_onion_key(),
760        };
761        let mut linkspecs = target
762            .linkspecs()
763            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
764        if !params.extend_by_ed25519_id {
765            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
766        }
767
768        let (tx, rx) = oneshot::channel();
769
770        let peer_id = OwnedChanTarget::from_chan_target(target);
771        self.control
772            .unbounded_send(CtrlMsg::ExtendNtor {
773                peer_id,
774                public_key: key,
775                linkspecs,
776                params,
777                done: tx,
778            })
779            .map_err(|_| Error::CircuitClosed)?;
780
781        rx.await.map_err(|_| Error::CircuitClosed)??;
782
783        Ok(())
784    }
785
786    /// Extend the circuit via the ntor handshake to a new target last
787    /// hop.
788    pub async fn extend_ntor_v3<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
789    where
790        Tg: CircTarget,
791    {
792        let key = NtorV3PublicKey {
793            id: *target
794                .ed_identity()
795                .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
796            pk: *target.ntor_onion_key(),
797        };
798        let mut linkspecs = target
799            .linkspecs()
800            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
801        if !params.extend_by_ed25519_id {
802            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
803        }
804
805        let (tx, rx) = oneshot::channel();
806
807        let peer_id = OwnedChanTarget::from_chan_target(target);
808        self.control
809            .unbounded_send(CtrlMsg::ExtendNtorV3 {
810                peer_id,
811                public_key: key,
812                linkspecs,
813                params,
814                done: tx,
815            })
816            .map_err(|_| Error::CircuitClosed)?;
817
818        rx.await.map_err(|_| Error::CircuitClosed)??;
819
820        Ok(())
821    }
822
823    /// Extend this circuit by a single, "virtual" hop.
824    ///
825    /// A virtual hop is one for which we do not add an actual network connection
826    /// between separate hosts (such as Relays).  We only add a layer of
827    /// cryptography.
828    ///
829    /// This is used to implement onion services: the client and the service
830    /// both build a circuit to a single rendezvous point, and tell the
831    /// rendezvous point to relay traffic between their two circuits.  Having
832    /// completed a [`handshake`] out of band[^1], the parties each extend their
833    /// circuits by a single "virtual" encryption hop that represents their
834    /// shared cryptographic context.
835    ///
836    /// Once a circuit has been extended in this way, it is an error to try to
837    /// extend it in any other way.
838    ///
839    /// [^1]: Technically, the handshake is only _mostly_ out of band: the
840    ///     client sends their half of the handshake in an ` message, and the
841    ///     service's response is inline in its `RENDEZVOUS2` message.
842    //
843    // TODO hs: let's try to enforce the "you can't extend a circuit again once
844    // it has been extended this way" property.  We could do that with internal
845    // state, or some kind of a type state pattern.
846    //
847    // TODO hs: possibly we should take a set of Protovers, and not just `Params`.
848    #[cfg(feature = "hs-common")]
849    pub async fn extend_virtual(
850        &self,
851        protocol: handshake::RelayProtocol,
852        role: handshake::HandshakeRole,
853        seed: impl handshake::KeyGenerator,
854        params: CircParameters,
855    ) -> Result<()> {
856        use self::handshake::BoxedClientLayer;
857
858        let protocol = handshake::RelayCryptLayerProtocol::from(protocol);
859        let relay_cell_format = protocol.relay_cell_format();
860
861        let BoxedClientLayer { fwd, back, binding } =
862            protocol.construct_client_layers(role, seed)?;
863
864        let (tx, rx) = oneshot::channel();
865        let message = CtrlCmd::ExtendVirtual {
866            relay_cell_format,
867            cell_crypto: (fwd, back, binding),
868            params,
869            done: tx,
870        };
871
872        self.command
873            .unbounded_send(message)
874            .map_err(|_| Error::CircuitClosed)?;
875
876        rx.await.map_err(|_| Error::CircuitClosed)?
877    }
878
879    /// Helper, used to begin a stream.
880    ///
881    /// This function allocates a stream ID, and sends the message
882    /// (like a BEGIN or RESOLVE), but doesn't wait for a response.
883    ///
884    /// The caller will typically want to see the first cell in response,
885    /// to see whether it is e.g. an END or a CONNECTED.
886    async fn begin_stream_impl(
887        self: &Arc<ClientCirc>,
888        begin_msg: AnyRelayMsg,
889        cmd_checker: AnyCmdChecker,
890    ) -> Result<(StreamReader, StreamTarget, StreamAccount)> {
891        // TODO: Possibly this should take a hop, rather than just
892        // assuming it's the last hop.
893        let hop = TargetHop::LastHop;
894
895        let time_prov = self.time_provider.clone();
896
897        let memquota = StreamAccount::new(self.mq_account())?;
898        let (sender, receiver) = MpscSpec::new(STREAM_READER_BUFFER)
899            .new_mq(time_prov.clone(), memquota.as_raw_account())?;
900        let (tx, rx) = oneshot::channel();
901        let (msg_tx, msg_rx) =
902            MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(time_prov, memquota.as_raw_account())?;
903
904        self.control
905            .unbounded_send(CtrlMsg::BeginStream {
906                hop,
907                message: begin_msg,
908                sender,
909                rx: msg_rx,
910                done: tx,
911                cmd_checker,
912            })
913            .map_err(|_| Error::CircuitClosed)?;
914
915        let (stream_id, hop, relay_cell_format) = rx.await.map_err(|_| Error::CircuitClosed)??;
916
917        let target = StreamTarget {
918            circ: self.clone(),
919            tx: msg_tx,
920            hop,
921            stream_id,
922            relay_cell_format,
923        };
924
925        let reader = StreamReader {
926            target: target.clone(),
927            receiver,
928            recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
929            ended: false,
930        };
931
932        Ok((reader, target, memquota))
933    }
934
935    /// Start a DataStream (anonymized connection) to the given
936    /// address and port, using a BEGIN cell.
937    async fn begin_data_stream(
938        self: &Arc<ClientCirc>,
939        msg: AnyRelayMsg,
940        optimistic: bool,
941    ) -> Result<DataStream> {
942        let (reader, target, memquota) = self
943            .begin_stream_impl(msg, DataCmdChecker::new_any())
944            .await?;
945        let mut stream = DataStream::new(reader, target, memquota);
946        if !optimistic {
947            stream.wait_for_connection().await?;
948        }
949        Ok(stream)
950    }
951
952    /// Start a stream to the given address and port, using a BEGIN
953    /// cell.
954    ///
955    /// The use of a string for the address is intentional: you should let
956    /// the remote Tor relay do the hostname lookup for you.
957    pub async fn begin_stream(
958        self: &Arc<ClientCirc>,
959        target: &str,
960        port: u16,
961        parameters: Option<StreamParameters>,
962    ) -> Result<DataStream> {
963        let parameters = parameters.unwrap_or_default();
964        let begin_flags = parameters.begin_flags();
965        let optimistic = parameters.is_optimistic();
966        let target = if parameters.suppressing_hostname() {
967            ""
968        } else {
969            target
970        };
971        let beginmsg = Begin::new(target, port, begin_flags)
972            .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
973        self.begin_data_stream(beginmsg.into(), optimistic).await
974    }
975
976    /// Start a new stream to the last relay in the circuit, using
977    /// a BEGIN_DIR cell.
978    pub async fn begin_dir_stream(self: Arc<ClientCirc>) -> Result<DataStream> {
979        // Note that we always open begindir connections optimistically.
980        // Since they are local to a relay that we've already authenticated
981        // with and built a circuit to, there should be no additional checks
982        // we need to perform to see whether the BEGINDIR will succeed.
983        self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
984            .await
985    }
986
987    /// Perform a DNS lookup, using a RESOLVE cell with the last relay
988    /// in this circuit.
989    ///
990    /// Note that this function does not check for timeouts; that's
991    /// the caller's responsibility.
992    pub async fn resolve(self: &Arc<ClientCirc>, hostname: &str) -> Result<Vec<IpAddr>> {
993        let resolve_msg = Resolve::new(hostname);
994
995        let resolved_msg = self.try_resolve(resolve_msg).await?;
996
997        resolved_msg
998            .into_answers()
999            .into_iter()
1000            .filter_map(|(val, _)| match resolvedval_to_result(val) {
1001                Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
1002                Ok(_) => None,
1003                Err(e) => Some(Err(e)),
1004            })
1005            .collect()
1006    }
1007
1008    /// Perform a reverse DNS lookup, by sending a RESOLVE cell with
1009    /// the last relay on this circuit.
1010    ///
1011    /// Note that this function does not check for timeouts; that's
1012    /// the caller's responsibility.
1013    pub async fn resolve_ptr(self: &Arc<ClientCirc>, addr: IpAddr) -> Result<Vec<String>> {
1014        let resolve_ptr_msg = Resolve::new_reverse(&addr);
1015
1016        let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
1017
1018        resolved_msg
1019            .into_answers()
1020            .into_iter()
1021            .filter_map(|(val, _)| match resolvedval_to_result(val) {
1022                Ok(ResolvedVal::Hostname(v)) => Some(
1023                    String::from_utf8(v)
1024                        .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
1025                ),
1026                Ok(_) => None,
1027                Err(e) => Some(Err(e)),
1028            })
1029            .collect()
1030    }
1031
1032    /// Helper: Send the resolve message, and read resolved message from
1033    /// resolve stream.
1034    async fn try_resolve(self: &Arc<ClientCirc>, msg: Resolve) -> Result<Resolved> {
1035        let (reader, _target, memquota) = self
1036            .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
1037            .await?;
1038        let mut resolve_stream = ResolveStream::new(reader, memquota);
1039        resolve_stream.read_msg().await
1040    }
1041
1042    /// Shut down this circuit, along with all streams that are using it.
1043    /// Happens asynchronously (i.e. the circuit won't necessarily be done shutting down
1044    /// immediately after this function returns!).
1045    ///
1046    /// Note that other references to this circuit may exist.  If they
1047    /// do, they will stop working after you call this function.
1048    ///
1049    /// It's not necessary to call this method if you're just done
1050    /// with a circuit: the circuit should close on its own once nothing
1051    /// is using it any more.
1052    pub fn terminate(&self) {
1053        let _ = self.command.unbounded_send(CtrlCmd::Shutdown);
1054    }
1055
1056    /// Called when a circuit-level protocol error has occurred and the
1057    /// circuit needs to shut down.
1058    ///
1059    /// This is a separate function because we may eventually want to have
1060    /// it do more than just shut down.
1061    ///
1062    /// As with `terminate`, this function is asynchronous.
1063    pub(crate) fn protocol_error(&self) {
1064        self.terminate();
1065    }
1066
1067    /// Return true if this circuit is closed and therefore unusable.
1068    pub fn is_closing(&self) -> bool {
1069        self.control.is_closed()
1070    }
1071
1072    /// Return a process-unique identifier for this circuit.
1073    pub fn unique_id(&self) -> UniqId {
1074        self.unique_id
1075    }
1076
1077    /// Return the number of hops in this circuit.
1078    ///
1079    /// NOTE: This function will currently return only the number of hops
1080    /// _currently_ in the circuit. If there is an extend operation in progress,
1081    /// the currently pending hop may or may not be counted, depending on whether
1082    /// the extend operation finishes before this call is done.
1083    pub fn n_hops(&self) -> Result<usize> {
1084        self.mutable
1085            .n_hops(self.unique_id)
1086            .map_err(|_| Error::CircuitClosed)
1087    }
1088
1089    /// Return a future that will resolve once this circuit has closed.
1090    ///
1091    /// Note that this method does not itself cause the circuit to shut down.
1092    ///
1093    /// TODO: Perhaps this should return some kind of status indication instead
1094    /// of just ()
1095    #[cfg(feature = "experimental-api")]
1096    pub fn wait_for_close(&self) -> impl futures::Future<Output = ()> + Send + Sync + 'static {
1097        self.reactor_closed_rx.clone().map(|_| ())
1098    }
1099}
1100
1101/// Handle to use during an ongoing protocol exchange with a circuit's last hop
1102///
1103/// This is obtained from [`ClientCirc::start_conversation`],
1104/// and used to send messages to the last hop relay.
1105//
1106// TODO(conflux): this should use ClientTunnel, and it should be moved into
1107// the tunnel module.
1108#[cfg(feature = "send-control-msg")]
1109#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
1110pub struct Conversation<'r>(&'r ClientCirc);
1111
1112#[cfg(feature = "send-control-msg")]
1113#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
1114impl Conversation<'_> {
1115    /// Send a protocol message as part of an ad-hoc exchange
1116    ///
1117    /// Responses are handled by the `MsgHandler` set up
1118    /// when the `Conversation` was created.
1119    pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
1120        self.send_internal(Some(msg), None).await
1121    }
1122
1123    /// Send a `SendMsgAndInstallHandler` to the reactor and wait for the outcome
1124    ///
1125    /// The guts of `start_conversation` and `Conversation::send_msg`
1126    pub(crate) async fn send_internal(
1127        &self,
1128        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
1129        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
1130    ) -> Result<()> {
1131        let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
1132        let (sender, receiver) = oneshot::channel();
1133
1134        let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
1135            msg,
1136            handler,
1137            sender,
1138        };
1139        self.0
1140            .control
1141            .unbounded_send(ctrl_msg)
1142            .map_err(|_| Error::CircuitClosed)?;
1143
1144        receiver.await.map_err(|_| Error::CircuitClosed)?
1145    }
1146}
1147
1148impl PendingClientCirc {
1149    /// Instantiate a new circuit object: used from Channel::new_circ().
1150    ///
1151    /// Does not send a CREATE* cell on its own.
1152    ///
1153    ///
1154    pub(crate) fn new(
1155        id: CircId,
1156        channel: Arc<Channel>,
1157        createdreceiver: oneshot::Receiver<CreateResponse>,
1158        input: CircuitRxReceiver,
1159        unique_id: UniqId,
1160        runtime: DynTimeProvider,
1161        memquota: CircuitAccount,
1162    ) -> (PendingClientCirc, crate::tunnel::reactor::Reactor) {
1163        let time_provider = channel.time_provider().clone();
1164        let (reactor, control_tx, command_tx, reactor_closed_rx, mutable) =
1165            Reactor::new(channel, id, unique_id, input, runtime, memquota.clone());
1166
1167        let circuit = ClientCirc {
1168            mutable,
1169            unique_id,
1170            control: control_tx,
1171            command: command_tx,
1172            reactor_closed_rx: reactor_closed_rx.shared(),
1173            #[cfg(test)]
1174            circid: id,
1175            memquota,
1176            time_provider,
1177        };
1178
1179        let pending = PendingClientCirc {
1180            recvcreated: createdreceiver,
1181            circ: Arc::new(circuit),
1182        };
1183        (pending, reactor)
1184    }
1185
1186    /// Extract the process-unique identifier for this pending circuit.
1187    pub fn peek_unique_id(&self) -> UniqId {
1188        self.circ.unique_id
1189    }
1190
1191    /// Use the (questionable!) CREATE_FAST handshake to connect to the
1192    /// first hop of this circuit.
1193    ///
1194    /// There's no authentication in CRATE_FAST,
1195    /// so we don't need to know whom we're connecting to: we're just
1196    /// connecting to whichever relay the channel is for.
1197    pub async fn create_firsthop_fast(self, params: CircParameters) -> Result<Arc<ClientCirc>> {
1198        let (tx, rx) = oneshot::channel();
1199        self.circ
1200            .control
1201            .unbounded_send(CtrlMsg::Create {
1202                recv_created: self.recvcreated,
1203                handshake: CircuitHandshake::CreateFast,
1204                params,
1205                done: tx,
1206            })
1207            .map_err(|_| Error::CircuitClosed)?;
1208
1209        rx.await.map_err(|_| Error::CircuitClosed)??;
1210
1211        Ok(self.circ)
1212    }
1213
1214    /// Use the most appropriate handshake to connect to the first hop of this circuit.
1215    ///
1216    /// Note that the provided 'target' must match the channel's target,
1217    /// or the handshake will fail.
1218    pub async fn create_firsthop<Tg>(
1219        self,
1220        target: &Tg,
1221        params: CircParameters,
1222    ) -> Result<Arc<ClientCirc>>
1223    where
1224        Tg: tor_linkspec::CircTarget,
1225    {
1226        // (See note in ClientCirc::extend.)
1227        if target
1228            .protovers()
1229            .supports_named_subver(named::RELAY_NTORV3)
1230        {
1231            self.create_firsthop_ntor_v3(target, params).await
1232        } else {
1233            self.create_firsthop_ntor(target, params).await
1234        }
1235    }
1236
1237    /// Use the ntor handshake to connect to the first hop of this circuit.
1238    ///
1239    /// Note that the provided 'target' must match the channel's target,
1240    /// or the handshake will fail.
1241    pub async fn create_firsthop_ntor<Tg>(
1242        self,
1243        target: &Tg,
1244        params: CircParameters,
1245    ) -> Result<Arc<ClientCirc>>
1246    where
1247        Tg: tor_linkspec::CircTarget,
1248    {
1249        let (tx, rx) = oneshot::channel();
1250
1251        self.circ
1252            .control
1253            .unbounded_send(CtrlMsg::Create {
1254                recv_created: self.recvcreated,
1255                handshake: CircuitHandshake::Ntor {
1256                    public_key: NtorPublicKey {
1257                        id: *target
1258                            .rsa_identity()
1259                            .ok_or(Error::MissingId(RelayIdType::Rsa))?,
1260                        pk: *target.ntor_onion_key(),
1261                    },
1262                    ed_identity: *target
1263                        .ed_identity()
1264                        .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
1265                },
1266                params,
1267                done: tx,
1268            })
1269            .map_err(|_| Error::CircuitClosed)?;
1270
1271        rx.await.map_err(|_| Error::CircuitClosed)??;
1272
1273        Ok(self.circ)
1274    }
1275
1276    /// Use the ntor_v3 handshake to connect to the first hop of this circuit.
1277    ///
1278    /// Assumes that the target supports ntor_v3. The caller should verify
1279    /// this before calling this function, e.g. by validating that the target
1280    /// has advertised ["Relay=4"](https://spec.torproject.org/tor-spec/subprotocol-versioning.html#relay).
1281    ///
1282    /// Note that the provided 'target' must match the channel's target,
1283    /// or the handshake will fail.
1284    pub async fn create_firsthop_ntor_v3<Tg>(
1285        self,
1286        target: &Tg,
1287        params: CircParameters,
1288    ) -> Result<Arc<ClientCirc>>
1289    where
1290        Tg: tor_linkspec::CircTarget,
1291    {
1292        let (tx, rx) = oneshot::channel();
1293
1294        self.circ
1295            .control
1296            .unbounded_send(CtrlMsg::Create {
1297                recv_created: self.recvcreated,
1298                handshake: CircuitHandshake::NtorV3 {
1299                    public_key: NtorV3PublicKey {
1300                        id: *target
1301                            .ed_identity()
1302                            .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
1303                        pk: *target.ntor_onion_key(),
1304                    },
1305                },
1306                params,
1307                done: tx,
1308            })
1309            .map_err(|_| Error::CircuitClosed)?;
1310
1311        rx.await.map_err(|_| Error::CircuitClosed)??;
1312
1313        Ok(self.circ)
1314    }
1315}
1316
1317/// Convert a [`ResolvedVal`] into a Result, based on whether or not
1318/// it represents an error.
1319fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
1320    match val {
1321        ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
1322        ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
1323        ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
1324        _ => Ok(val),
1325    }
1326}
1327
1328#[cfg(test)]
1329pub(crate) mod test {
1330    // @@ begin test lint list maintained by maint/add_warning @@
1331    #![allow(clippy::bool_assert_comparison)]
1332    #![allow(clippy::clone_on_copy)]
1333    #![allow(clippy::dbg_macro)]
1334    #![allow(clippy::mixed_attributes_style)]
1335    #![allow(clippy::print_stderr)]
1336    #![allow(clippy::print_stdout)]
1337    #![allow(clippy::single_char_pattern)]
1338    #![allow(clippy::unwrap_used)]
1339    #![allow(clippy::unchecked_duration_subtraction)]
1340    #![allow(clippy::useless_vec)]
1341    #![allow(clippy::needless_pass_by_value)]
1342    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1343
1344    use super::*;
1345    use crate::channel::OpenChanCellS2C;
1346    use crate::channel::{test::new_reactor, CodecError};
1347    use crate::congestion::test_utils::params::build_cc_vegas_params;
1348    use crate::crypto::cell::RelayCellBody;
1349    use crate::crypto::handshake::ntor_v3::NtorV3Server;
1350    #[cfg(feature = "hs-service")]
1351    use crate::stream::IncomingStreamRequestFilter;
1352    use chanmsg::{AnyChanMsg, Created2, CreatedFast};
1353    use futures::channel::mpsc::{Receiver, Sender};
1354    use futures::io::{AsyncReadExt, AsyncWriteExt};
1355    use futures::sink::SinkExt;
1356    use futures::stream::StreamExt;
1357    use futures::task::SpawnExt;
1358    use hex_literal::hex;
1359    use std::collections::{HashMap, VecDeque};
1360    use std::fmt::Debug;
1361    use std::time::Duration;
1362    use tor_basic_utils::test_rng::testing_rng;
1363    use tor_cell::chancell::{msg as chanmsg, AnyChanCell, BoxedCellBody, ChanCmd};
1364    use tor_cell::relaycell::extend::{self as extend_ext, CircRequestExt, CircResponseExt};
1365    use tor_cell::relaycell::msg::SendmeTag;
1366    use tor_cell::relaycell::{
1367        msg as relaymsg, AnyRelayMsgOuter, RelayCellFormat, RelayCmd, RelayMsg as _, StreamId,
1368    };
1369    use tor_linkspec::OwnedCircTarget;
1370    use tor_memquota::HasMemoryCost;
1371    use tor_rtcompat::Runtime;
1372    use tracing::trace;
1373    use tracing_test::traced_test;
1374
1375    impl PendingClientCirc {
1376        /// Testing only: Extract the circuit ID for this pending circuit.
1377        pub(crate) fn peek_circid(&self) -> CircId {
1378            self.circ.circid
1379        }
1380    }
1381
1382    impl ClientCirc {
1383        /// Testing only: Extract the circuit ID of this circuit.
1384        pub(crate) fn peek_circid(&self) -> CircId {
1385            self.circid
1386        }
1387    }
1388
1389    fn rmsg_to_ccmsg(id: Option<StreamId>, msg: relaymsg::AnyRelayMsg) -> ClientCircChanMsg {
1390        // TODO #1947: test other formats.
1391        let rfmt = RelayCellFormat::V0;
1392        let body: BoxedCellBody = AnyRelayMsgOuter::new(id, msg)
1393            .encode(rfmt, &mut testing_rng())
1394            .unwrap();
1395        let chanmsg = chanmsg::Relay::from(body);
1396        ClientCircChanMsg::Relay(chanmsg)
1397    }
1398
1399    // Example relay IDs and keys
1400    const EXAMPLE_SK: [u8; 32] =
1401        hex!("7789d92a89711a7e2874c61ea495452cfd48627b3ca2ea9546aafa5bf7b55803");
1402    const EXAMPLE_PK: [u8; 32] =
1403        hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253");
1404    const EXAMPLE_ED_ID: [u8; 32] = [6; 32];
1405    const EXAMPLE_RSA_ID: [u8; 20] = [10; 20];
1406
1407    /// Make an MPSC queue, of the type we use in Channels, but a fake one for testing
1408    #[cfg(test)]
1409    pub(crate) fn fake_mpsc<T: HasMemoryCost + Debug + Send>(
1410        buffer: usize,
1411    ) -> (StreamMpscSender<T>, StreamMpscReceiver<T>) {
1412        crate::fake_mpsc(buffer)
1413    }
1414
1415    /// return an example OwnedCircTarget that can get used for an ntor handshake.
1416    fn example_target() -> OwnedCircTarget {
1417        let mut builder = OwnedCircTarget::builder();
1418        builder
1419            .chan_target()
1420            .ed_identity(EXAMPLE_ED_ID.into())
1421            .rsa_identity(EXAMPLE_RSA_ID.into());
1422        builder
1423            .ntor_onion_key(EXAMPLE_PK.into())
1424            .protocols("FlowCtrl=1".parse().unwrap())
1425            .build()
1426            .unwrap()
1427    }
1428    fn example_ntor_key() -> crate::crypto::handshake::ntor::NtorSecretKey {
1429        crate::crypto::handshake::ntor::NtorSecretKey::new(
1430            EXAMPLE_SK.into(),
1431            EXAMPLE_PK.into(),
1432            EXAMPLE_RSA_ID.into(),
1433        )
1434    }
1435    fn example_ntor_v3_key() -> crate::crypto::handshake::ntor_v3::NtorV3SecretKey {
1436        crate::crypto::handshake::ntor_v3::NtorV3SecretKey::new(
1437            EXAMPLE_SK.into(),
1438            EXAMPLE_PK.into(),
1439            EXAMPLE_ED_ID.into(),
1440        )
1441    }
1442
1443    fn working_fake_channel<R: Runtime>(
1444        rt: &R,
1445    ) -> (
1446        Arc<Channel>,
1447        Receiver<AnyChanCell>,
1448        Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
1449    ) {
1450        let (channel, chan_reactor, rx, tx) = new_reactor(rt.clone());
1451        rt.spawn(async {
1452            let _ignore = chan_reactor.run().await;
1453        })
1454        .unwrap();
1455        (channel, rx, tx)
1456    }
1457
1458    /// Which handshake type to use.
1459    #[derive(Copy, Clone)]
1460    enum HandshakeType {
1461        Fast,
1462        Ntor,
1463        NtorV3,
1464    }
1465
1466    async fn test_create<R: Runtime>(rt: &R, handshake_type: HandshakeType, with_cc: bool) {
1467        // We want to try progressing from a pending circuit to a circuit
1468        // via a crate_fast handshake.
1469
1470        use crate::crypto::handshake::{fast::CreateFastServer, ntor::NtorServer, ServerHandshake};
1471
1472        let (chan, mut rx, _sink) = working_fake_channel(rt);
1473        let circid = CircId::new(128).unwrap();
1474        let (created_send, created_recv) = oneshot::channel();
1475        let (_circmsg_send, circmsg_recv) = fake_mpsc(64);
1476        let unique_id = UniqId::new(23, 17);
1477
1478        let (pending, reactor) = PendingClientCirc::new(
1479            circid,
1480            chan,
1481            created_recv,
1482            circmsg_recv,
1483            unique_id,
1484            DynTimeProvider::new(rt.clone()),
1485            CircuitAccount::new_noop(),
1486        );
1487
1488        rt.spawn(async {
1489            let _ignore = reactor.run().await;
1490        })
1491        .unwrap();
1492
1493        // Future to pretend to be a relay on the other end of the circuit.
1494        let simulate_relay_fut = async move {
1495            let mut rng = testing_rng();
1496            let create_cell = rx.next().await.unwrap();
1497            assert_eq!(create_cell.circid(), Some(circid));
1498            let reply = match handshake_type {
1499                HandshakeType::Fast => {
1500                    let cf = match create_cell.msg() {
1501                        AnyChanMsg::CreateFast(cf) => cf,
1502                        other => panic!("{:?}", other),
1503                    };
1504                    let (_, rep) = CreateFastServer::server(
1505                        &mut rng,
1506                        &mut |_: &()| Some(()),
1507                        &[()],
1508                        cf.handshake(),
1509                    )
1510                    .unwrap();
1511                    CreateResponse::CreatedFast(CreatedFast::new(rep))
1512                }
1513                HandshakeType::Ntor => {
1514                    let c2 = match create_cell.msg() {
1515                        AnyChanMsg::Create2(c2) => c2,
1516                        other => panic!("{:?}", other),
1517                    };
1518                    let (_, rep) = NtorServer::server(
1519                        &mut rng,
1520                        &mut |_: &()| Some(()),
1521                        &[example_ntor_key()],
1522                        c2.body(),
1523                    )
1524                    .unwrap();
1525                    CreateResponse::Created2(Created2::new(rep))
1526                }
1527                HandshakeType::NtorV3 => {
1528                    let c2 = match create_cell.msg() {
1529                        AnyChanMsg::Create2(c2) => c2,
1530                        other => panic!("{:?}", other),
1531                    };
1532                    let mut reply_fn = if with_cc {
1533                        |client_exts: &[CircRequestExt]| {
1534                            let _ = client_exts
1535                                .iter()
1536                                .find(|e| matches!(e, CircRequestExt::CcRequest(_)))
1537                                .expect("Client failed to request CC");
1538                            // This needs to be aligned to test_utils params
1539                            // value due to validation that needs it in range.
1540                            Some(vec![CircResponseExt::CcResponse(
1541                                extend_ext::CcResponse::new(31),
1542                            )])
1543                        }
1544                    } else {
1545                        |_: &_| Some(vec![])
1546                    };
1547                    let (_, rep) = NtorV3Server::server(
1548                        &mut rng,
1549                        &mut reply_fn,
1550                        &[example_ntor_v3_key()],
1551                        c2.body(),
1552                    )
1553                    .unwrap();
1554                    CreateResponse::Created2(Created2::new(rep))
1555                }
1556            };
1557            created_send.send(reply).unwrap();
1558        };
1559        // Future to pretend to be a client.
1560        let client_fut = async move {
1561            let target = example_target();
1562            let params = CircParameters::default();
1563            let ret = match handshake_type {
1564                HandshakeType::Fast => {
1565                    trace!("doing fast create");
1566                    pending.create_firsthop_fast(params).await
1567                }
1568                HandshakeType::Ntor => {
1569                    trace!("doing ntor create");
1570                    pending.create_firsthop_ntor(&target, params).await
1571                }
1572                HandshakeType::NtorV3 => {
1573                    let params = if with_cc {
1574                        // Setup CC vegas parameters.
1575                        CircParameters::new(true, build_cc_vegas_params())
1576                    } else {
1577                        params
1578                    };
1579                    trace!("doing ntor_v3 create");
1580                    pending.create_firsthop_ntor_v3(&target, params).await
1581                }
1582            };
1583            trace!("create done: result {:?}", ret);
1584            ret
1585        };
1586
1587        let (circ, _) = futures::join!(client_fut, simulate_relay_fut);
1588
1589        let _circ = circ.unwrap();
1590
1591        // pfew!  We've build a circuit!  Let's make sure it has one hop.
1592        assert_eq!(_circ.n_hops().unwrap(), 1);
1593    }
1594
1595    #[traced_test]
1596    #[test]
1597    fn test_create_fast() {
1598        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1599            test_create(&rt, HandshakeType::Fast, false).await;
1600        });
1601    }
1602    #[traced_test]
1603    #[test]
1604    fn test_create_ntor() {
1605        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1606            test_create(&rt, HandshakeType::Ntor, false).await;
1607        });
1608    }
1609    #[traced_test]
1610    #[test]
1611    fn test_create_ntor_v3() {
1612        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1613            test_create(&rt, HandshakeType::NtorV3, false).await;
1614        });
1615    }
1616    #[traced_test]
1617    #[test]
1618    #[cfg(feature = "flowctl-cc")]
1619    fn test_create_ntor_v3_with_cc() {
1620        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1621            test_create(&rt, HandshakeType::NtorV3, true).await;
1622        });
1623    }
1624
1625    // An encryption layer that doesn't do any crypto.   Can be used
1626    // as inbound or outbound, but not both at once.
1627    pub(crate) struct DummyCrypto {
1628        counter_tag: [u8; 20],
1629        counter: u32,
1630        lasthop: bool,
1631    }
1632    impl DummyCrypto {
1633        fn next_tag(&mut self) -> SendmeTag {
1634            #![allow(clippy::identity_op)]
1635            self.counter_tag[0] = ((self.counter >> 0) & 255) as u8;
1636            self.counter_tag[1] = ((self.counter >> 8) & 255) as u8;
1637            self.counter_tag[2] = ((self.counter >> 16) & 255) as u8;
1638            self.counter_tag[3] = ((self.counter >> 24) & 255) as u8;
1639            self.counter += 1;
1640            self.counter_tag.into()
1641        }
1642    }
1643
1644    impl crate::crypto::cell::OutboundClientLayer for DummyCrypto {
1645        fn originate_for(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
1646            self.next_tag()
1647        }
1648        fn encrypt_outbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
1649    }
1650    impl crate::crypto::cell::InboundClientLayer for DummyCrypto {
1651        fn decrypt_inbound(
1652            &mut self,
1653            _cmd: ChanCmd,
1654            _cell: &mut RelayCellBody,
1655        ) -> Option<SendmeTag> {
1656            if self.lasthop {
1657                Some(self.next_tag())
1658            } else {
1659                None
1660            }
1661        }
1662    }
1663    impl DummyCrypto {
1664        pub(crate) fn new(lasthop: bool) -> Self {
1665            DummyCrypto {
1666                counter_tag: [0; 20],
1667                counter: 0,
1668                lasthop,
1669            }
1670        }
1671    }
1672
1673    // Helper: set up a 3-hop circuit with no encryption, where the
1674    // next inbound message seems to come from hop next_msg_from
1675    async fn newcirc_ext<R: Runtime>(
1676        rt: &R,
1677        chan: Arc<Channel>,
1678        next_msg_from: HopNum,
1679    ) -> (Arc<ClientCirc>, CircuitRxSender) {
1680        let circid = CircId::new(128).unwrap();
1681        let (_created_send, created_recv) = oneshot::channel();
1682        let (circmsg_send, circmsg_recv) = fake_mpsc(64);
1683        let unique_id = UniqId::new(23, 17);
1684
1685        let (pending, reactor) = PendingClientCirc::new(
1686            circid,
1687            chan,
1688            created_recv,
1689            circmsg_recv,
1690            unique_id,
1691            DynTimeProvider::new(rt.clone()),
1692            CircuitAccount::new_noop(),
1693        );
1694
1695        rt.spawn(async {
1696            let _ignore = reactor.run().await;
1697        })
1698        .unwrap();
1699
1700        let PendingClientCirc {
1701            circ,
1702            recvcreated: _,
1703        } = pending;
1704
1705        // TODO #1067: Support other formats
1706        let relay_cell_format = RelayCellFormat::V0;
1707        for idx in 0_u8..3 {
1708            let params = CircParameters::default();
1709            let (tx, rx) = oneshot::channel();
1710            circ.command
1711                .unbounded_send(CtrlCmd::AddFakeHop {
1712                    relay_cell_format,
1713                    fwd_lasthop: idx == 2,
1714                    rev_lasthop: idx == u8::from(next_msg_from),
1715                    params,
1716                    done: tx,
1717                })
1718                .unwrap();
1719            rx.await.unwrap().unwrap();
1720        }
1721
1722        (circ, circmsg_send)
1723    }
1724
1725    // Helper: set up a 3-hop circuit with no encryption, where the
1726    // next inbound message seems to come from hop next_msg_from
1727    async fn newcirc<R: Runtime>(rt: &R, chan: Arc<Channel>) -> (Arc<ClientCirc>, CircuitRxSender) {
1728        newcirc_ext(rt, chan, 2.into()).await
1729    }
1730
1731    async fn test_extend<R: Runtime>(rt: &R, handshake_type: HandshakeType) {
1732        use crate::crypto::handshake::{ntor::NtorServer, ServerHandshake};
1733
1734        let (chan, mut rx, _sink) = working_fake_channel(rt);
1735        let (circ, mut sink) = newcirc(rt, chan).await;
1736        let circid = circ.peek_circid();
1737        let params = CircParameters::default();
1738
1739        let extend_fut = async move {
1740            let target = example_target();
1741            match handshake_type {
1742                HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1743                HandshakeType::Ntor => circ.extend_ntor(&target, params).await.unwrap(),
1744                HandshakeType::NtorV3 => circ.extend_ntor_v3(&target, params).await.unwrap(),
1745            };
1746            circ // gotta keep the circ alive, or the reactor would exit.
1747        };
1748        let reply_fut = async move {
1749            // We've disabled encryption on this circuit, so we can just
1750            // read the extend2 cell.
1751            let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1752            assert_eq!(id, Some(circid));
1753            let rmsg = match chmsg {
1754                AnyChanMsg::RelayEarly(r) => {
1755                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1756                        .unwrap()
1757                }
1758                other => panic!("{:?}", other),
1759            };
1760            let e2 = match rmsg.msg() {
1761                AnyRelayMsg::Extend2(e2) => e2,
1762                other => panic!("{:?}", other),
1763            };
1764            let mut rng = testing_rng();
1765            let reply = match handshake_type {
1766                HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1767                HandshakeType::Ntor => {
1768                    let (_keygen, reply) = NtorServer::server(
1769                        &mut rng,
1770                        &mut |_: &()| Some(()),
1771                        &[example_ntor_key()],
1772                        e2.handshake(),
1773                    )
1774                    .unwrap();
1775                    reply
1776                }
1777                HandshakeType::NtorV3 => {
1778                    let (_keygen, reply) = NtorV3Server::server(
1779                        &mut rng,
1780                        &mut |_: &[CircRequestExt]| Some(vec![]),
1781                        &[example_ntor_v3_key()],
1782                        e2.handshake(),
1783                    )
1784                    .unwrap();
1785                    reply
1786                }
1787            };
1788
1789            let extended2 = relaymsg::Extended2::new(reply).into();
1790            sink.send(rmsg_to_ccmsg(None, extended2)).await.unwrap();
1791            (sink, rx) // gotta keep the sink and receiver alive, or the reactor will exit.
1792        };
1793
1794        let (circ, (_sink, _rx)) = futures::join!(extend_fut, reply_fut);
1795
1796        // Did we really add another hop?
1797        assert_eq!(circ.n_hops().unwrap(), 4);
1798
1799        // Do the path accessors report a reasonable outcome?
1800        {
1801            let path = circ
1802                .path_ref()
1803                .unwrap()
1804                .all_hops()
1805                .into_iter()
1806                .filter_map(|hop| match hop {
1807                    path::HopDetail::Relay(r) => Some(r),
1808                    #[cfg(feature = "hs-common")]
1809                    path::HopDetail::Virtual => None,
1810                })
1811                .collect::<Vec<_>>();
1812
1813            assert_eq!(path.len(), 4);
1814            use tor_linkspec::HasRelayIds;
1815            assert_eq!(path[3].ed_identity(), example_target().ed_identity());
1816            assert_ne!(path[0].ed_identity(), example_target().ed_identity());
1817        }
1818        {
1819            let path = circ.path_ref().unwrap();
1820            assert_eq!(path.n_hops(), 4);
1821            use tor_linkspec::HasRelayIds;
1822            assert_eq!(
1823                path.hops()[3].as_chan_target().unwrap().ed_identity(),
1824                example_target().ed_identity()
1825            );
1826            assert_ne!(
1827                path.hops()[0].as_chan_target().unwrap().ed_identity(),
1828                example_target().ed_identity()
1829            );
1830        }
1831    }
1832
1833    #[traced_test]
1834    #[test]
1835    fn test_extend_ntor() {
1836        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1837            test_extend(&rt, HandshakeType::Ntor).await;
1838        });
1839    }
1840
1841    #[traced_test]
1842    #[test]
1843    fn test_extend_ntor_v3() {
1844        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1845            test_extend(&rt, HandshakeType::NtorV3).await;
1846        });
1847    }
1848
1849    async fn bad_extend_test_impl<R: Runtime>(
1850        rt: &R,
1851        reply_hop: HopNum,
1852        bad_reply: ClientCircChanMsg,
1853    ) -> Error {
1854        let (chan, _rx, _sink) = working_fake_channel(rt);
1855        let (circ, mut sink) = newcirc_ext(rt, chan, reply_hop).await;
1856        let params = CircParameters::default();
1857
1858        let target = example_target();
1859        #[allow(clippy::clone_on_copy)]
1860        let rtc = rt.clone();
1861        let sink_handle = rt
1862            .spawn_with_handle(async move {
1863                rtc.sleep(Duration::from_millis(100)).await;
1864                sink.send(bad_reply).await.unwrap();
1865                sink
1866            })
1867            .unwrap();
1868        let outcome = circ.extend_ntor(&target, params).await;
1869        let _sink = sink_handle.await;
1870
1871        assert_eq!(circ.n_hops().unwrap(), 3);
1872        assert!(outcome.is_err());
1873        outcome.unwrap_err()
1874    }
1875
1876    #[traced_test]
1877    #[test]
1878    fn bad_extend_wronghop() {
1879        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1880            let extended2 = relaymsg::Extended2::new(vec![]).into();
1881            let cc = rmsg_to_ccmsg(None, extended2);
1882
1883            let error = bad_extend_test_impl(&rt, 1.into(), cc).await;
1884            // This case shows up as a CircDestroy, since a message sent
1885            // from the wrong hop won't even be delivered to the extend
1886            // code's meta-handler.  Instead the unexpected message will cause
1887            // the circuit to get torn down.
1888            match error {
1889                Error::CircuitClosed => {}
1890                x => panic!("got other error: {}", x),
1891            }
1892        });
1893    }
1894
1895    #[traced_test]
1896    #[test]
1897    fn bad_extend_wrongtype() {
1898        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1899            let extended = relaymsg::Extended::new(vec![7; 200]).into();
1900            let cc = rmsg_to_ccmsg(None, extended);
1901
1902            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1903            match error {
1904                Error::BytesErr {
1905                    err: tor_bytes::Error::InvalidMessage(_),
1906                    object: "extended2 message",
1907                } => {}
1908                other => panic!("{:?}", other),
1909            }
1910        });
1911    }
1912
1913    #[traced_test]
1914    #[test]
1915    fn bad_extend_destroy() {
1916        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1917            let cc = ClientCircChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
1918            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1919            match error {
1920                Error::CircuitClosed => {}
1921                other => panic!("{:?}", other),
1922            }
1923        });
1924    }
1925
1926    #[traced_test]
1927    #[test]
1928    fn bad_extend_crypto() {
1929        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1930            let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
1931            let cc = rmsg_to_ccmsg(None, extended2);
1932            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1933            assert!(matches!(error, Error::BadCircHandshakeAuth));
1934        });
1935    }
1936
1937    #[traced_test]
1938    #[test]
1939    fn begindir() {
1940        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1941            let (chan, mut rx, _sink) = working_fake_channel(&rt);
1942            let (circ, mut sink) = newcirc(&rt, chan).await;
1943            let circid = circ.peek_circid();
1944
1945            let begin_and_send_fut = async move {
1946                // Here we'll say we've got a circuit, and we want to
1947                // make a simple BEGINDIR request with it.
1948                let mut stream = circ.begin_dir_stream().await.unwrap();
1949                stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
1950                stream.flush().await.unwrap();
1951                let mut buf = [0_u8; 1024];
1952                let n = stream.read(&mut buf).await.unwrap();
1953                assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
1954                let n = stream.read(&mut buf).await.unwrap();
1955                assert_eq!(n, 0);
1956                stream
1957            };
1958            let reply_fut = async move {
1959                // We've disabled encryption on this circuit, so we can just
1960                // read the begindir cell.
1961                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1962                assert_eq!(id, Some(circid));
1963                let rmsg = match chmsg {
1964                    AnyChanMsg::Relay(r) => {
1965                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1966                            .unwrap()
1967                    }
1968                    other => panic!("{:?}", other),
1969                };
1970                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1971                assert!(matches!(rmsg, AnyRelayMsg::BeginDir(_)));
1972
1973                // Reply with a Connected cell to indicate success.
1974                let connected = relaymsg::Connected::new_empty().into();
1975                sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
1976
1977                // Now read a DATA cell...
1978                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1979                assert_eq!(id, Some(circid));
1980                let rmsg = match chmsg {
1981                    AnyChanMsg::Relay(r) => {
1982                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1983                            .unwrap()
1984                    }
1985                    other => panic!("{:?}", other),
1986                };
1987                let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
1988                assert_eq!(streamid_2, streamid);
1989                if let AnyRelayMsg::Data(d) = rmsg {
1990                    assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
1991                } else {
1992                    panic!();
1993                }
1994
1995                // Write another data cell in reply!
1996                let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
1997                    .unwrap()
1998                    .into();
1999                sink.send(rmsg_to_ccmsg(streamid, data)).await.unwrap();
2000
2001                // Send an END cell to say that the conversation is over.
2002                let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
2003                sink.send(rmsg_to_ccmsg(streamid, end)).await.unwrap();
2004
2005                (rx, sink) // gotta keep these alive, or the reactor will exit.
2006            };
2007
2008            let (_stream, (_rx, _sink)) = futures::join!(begin_and_send_fut, reply_fut);
2009        });
2010    }
2011
2012    // Test: close a stream, either by dropping it or by calling AsyncWriteExt::close.
2013    fn close_stream_helper(by_drop: bool) {
2014        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2015            let (chan, mut rx, _sink) = working_fake_channel(&rt);
2016            let (circ, mut sink) = newcirc(&rt, chan).await;
2017
2018            let stream_fut = async move {
2019                let stream = circ
2020                    .begin_stream("www.example.com", 80, None)
2021                    .await
2022                    .unwrap();
2023
2024                let (r, mut w) = stream.split();
2025                if by_drop {
2026                    // Drop the writer and the reader, which should close the stream.
2027                    drop(r);
2028                    drop(w);
2029                    (None, circ) // make sure to keep the circuit alive
2030                } else {
2031                    // Call close on the writer, while keeping the reader alive.
2032                    w.close().await.unwrap();
2033                    (Some(r), circ)
2034                }
2035            };
2036            let handler_fut = async {
2037                // Read the BEGIN message.
2038                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2039                let rmsg = match msg {
2040                    AnyChanMsg::Relay(r) => {
2041                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2042                            .unwrap()
2043                    }
2044                    other => panic!("{:?}", other),
2045                };
2046                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2047                assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
2048
2049                // Reply with a CONNECTED.
2050                let connected =
2051                    relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
2052                sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2053
2054                // Expect an END.
2055                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2056                let rmsg = match msg {
2057                    AnyChanMsg::Relay(r) => {
2058                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2059                            .unwrap()
2060                    }
2061                    other => panic!("{:?}", other),
2062                };
2063                let (_, rmsg) = rmsg.into_streamid_and_msg();
2064                assert_eq!(rmsg.cmd(), RelayCmd::END);
2065
2066                (rx, sink) // keep these alive or the reactor will exit.
2067            };
2068
2069            let ((_opt_reader, _circ), (_rx, _sink)) = futures::join!(stream_fut, handler_fut);
2070        });
2071    }
2072
2073    #[traced_test]
2074    #[test]
2075    fn drop_stream() {
2076        close_stream_helper(true);
2077    }
2078
2079    #[traced_test]
2080    #[test]
2081    fn close_stream() {
2082        close_stream_helper(false);
2083    }
2084
2085    // Set up a circuit and stream that expects some incoming SENDMEs.
2086    async fn setup_incoming_sendme_case<R: Runtime>(
2087        rt: &R,
2088        n_to_send: usize,
2089    ) -> (
2090        Arc<ClientCirc>,
2091        DataStream,
2092        CircuitRxSender,
2093        Option<StreamId>,
2094        usize,
2095        Receiver<AnyChanCell>,
2096        Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
2097    ) {
2098        let (chan, mut rx, sink2) = working_fake_channel(rt);
2099        let (circ, mut sink) = newcirc(rt, chan).await;
2100        let circid = circ.peek_circid();
2101
2102        let begin_and_send_fut = {
2103            let circ = circ.clone();
2104            async move {
2105                // Take our circuit and make a stream on it.
2106                let mut stream = circ
2107                    .begin_stream("www.example.com", 443, None)
2108                    .await
2109                    .unwrap();
2110                let junk = [0_u8; 1024];
2111                let mut remaining = n_to_send;
2112                while remaining > 0 {
2113                    let n = std::cmp::min(remaining, junk.len());
2114                    stream.write_all(&junk[..n]).await.unwrap();
2115                    remaining -= n;
2116                }
2117                stream.flush().await.unwrap();
2118                stream
2119            }
2120        };
2121
2122        let receive_fut = async move {
2123            // Read the begin cell.
2124            let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2125            let rmsg = match chmsg {
2126                AnyChanMsg::Relay(r) => {
2127                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2128                        .unwrap()
2129                }
2130                other => panic!("{:?}", other),
2131            };
2132            let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2133            assert!(matches!(rmsg, AnyRelayMsg::Begin(_)));
2134            // Reply with a connected cell...
2135            let connected = relaymsg::Connected::new_empty().into();
2136            sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2137            // Now read bytes from the stream until we have them all.
2138            let mut bytes_received = 0_usize;
2139            let mut cells_received = 0_usize;
2140            while bytes_received < n_to_send {
2141                // Read a data cell, and remember how much we got.
2142                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2143                assert_eq!(id, Some(circid));
2144
2145                let rmsg = match chmsg {
2146                    AnyChanMsg::Relay(r) => {
2147                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2148                            .unwrap()
2149                    }
2150                    other => panic!("{:?}", other),
2151                };
2152                let (streamid2, rmsg) = rmsg.into_streamid_and_msg();
2153                assert_eq!(streamid2, streamid);
2154                if let AnyRelayMsg::Data(dat) = rmsg {
2155                    cells_received += 1;
2156                    bytes_received += dat.as_ref().len();
2157                } else {
2158                    panic!();
2159                }
2160            }
2161
2162            (sink, streamid, cells_received, rx)
2163        };
2164
2165        let (stream, (sink, streamid, cells_received, rx)) =
2166            futures::join!(begin_and_send_fut, receive_fut);
2167
2168        (circ, stream, sink, streamid, cells_received, rx, sink2)
2169    }
2170
2171    #[traced_test]
2172    #[test]
2173    fn accept_valid_sendme() {
2174        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2175            let (circ, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
2176                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2177
2178            assert_eq!(cells_received, 301);
2179
2180            // Make sure that the circuit is indeed expecting the right sendmes
2181            {
2182                let (tx, rx) = oneshot::channel();
2183                circ.command
2184                    .unbounded_send(CtrlCmd::QuerySendWindow {
2185                        hop: 2.into(),
2186                        done: tx,
2187                    })
2188                    .unwrap();
2189                let (window, tags) = rx.await.unwrap().unwrap();
2190                assert_eq!(window, 1000 - 301);
2191                assert_eq!(tags.len(), 3);
2192                // 100
2193                assert_eq!(
2194                    tags[0],
2195                    SendmeTag::from(hex!("6400000000000000000000000000000000000000"))
2196                );
2197                // 200
2198                assert_eq!(
2199                    tags[1],
2200                    SendmeTag::from(hex!("c800000000000000000000000000000000000000"))
2201                );
2202                // 300
2203                assert_eq!(
2204                    tags[2],
2205                    SendmeTag::from(hex!("2c01000000000000000000000000000000000000"))
2206                );
2207            }
2208
2209            let reply_with_sendme_fut = async move {
2210                // make and send a circuit-level sendme.
2211                let c_sendme =
2212                    relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000"))
2213                        .into();
2214                sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
2215
2216                // Make and send a stream-level sendme.
2217                let s_sendme = relaymsg::Sendme::new_empty().into();
2218                sink.send(rmsg_to_ccmsg(streamid, s_sendme)).await.unwrap();
2219
2220                sink
2221            };
2222
2223            let _sink = reply_with_sendme_fut.await;
2224
2225            rt.advance_until_stalled().await;
2226
2227            // Now make sure that the circuit is still happy, and its
2228            // window is updated.
2229            {
2230                let (tx, rx) = oneshot::channel();
2231                circ.command
2232                    .unbounded_send(CtrlCmd::QuerySendWindow {
2233                        hop: 2.into(),
2234                        done: tx,
2235                    })
2236                    .unwrap();
2237                let (window, _tags) = rx.await.unwrap().unwrap();
2238                assert_eq!(window, 1000 - 201);
2239            }
2240        });
2241    }
2242
2243    #[traced_test]
2244    #[test]
2245    fn invalid_circ_sendme() {
2246        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2247            // Same setup as accept_valid_sendme() test above but try giving
2248            // a sendme with the wrong tag.
2249
2250            let (circ, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
2251                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2252
2253            let reply_with_sendme_fut = async move {
2254                // make and send a circuit-level sendme with a bad tag.
2255                let c_sendme =
2256                    relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF"))
2257                        .into();
2258                sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
2259                sink
2260            };
2261
2262            let _sink = reply_with_sendme_fut.await;
2263
2264            // Check whether the reactor dies as a result of receiving invalid data.
2265            rt.advance_until_stalled().await;
2266            assert!(circ.is_closing());
2267        });
2268    }
2269
2270    #[traced_test]
2271    #[test]
2272    fn test_busy_stream_fairness() {
2273        // Number of streams to use.
2274        const N_STREAMS: usize = 3;
2275        // Number of cells (roughly) for each stream to send.
2276        const N_CELLS: usize = 20;
2277        // Number of bytes that *each* stream will send, and that we'll read
2278        // from the channel.
2279        const N_BYTES: usize = relaymsg::Data::MAXLEN_V0 * N_CELLS;
2280        // Ignoring cell granularity, with perfect fairness we'd expect
2281        // `N_BYTES/N_STREAMS` bytes from each stream.
2282        //
2283        // We currently allow for up to a full cell less than that.  This is
2284        // somewhat arbitrary and can be changed as needed, since we don't
2285        // provide any specific fairness guarantees.
2286        const MIN_EXPECTED_BYTES_PER_STREAM: usize =
2287            N_BYTES / N_STREAMS - relaymsg::Data::MAXLEN_V0;
2288
2289        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2290            let (chan, mut rx, _sink) = working_fake_channel(&rt);
2291            let (circ, mut sink) = newcirc(&rt, chan).await;
2292
2293            // Run clients in a single task, doing our own round-robin
2294            // scheduling of writes to the reactor. Conversely, if we were to
2295            // put each client in its own task, we would be at the the mercy of
2296            // how fairly the runtime schedules the client tasks, which is outside
2297            // the scope of this test.
2298            rt.spawn({
2299                // Clone the circuit to keep it alive after writers have
2300                // finished with it.
2301                let circ = circ.clone();
2302                async move {
2303                    let mut clients = VecDeque::new();
2304                    struct Client {
2305                        stream: DataStream,
2306                        to_write: &'static [u8],
2307                    }
2308                    for _ in 0..N_STREAMS {
2309                        clients.push_back(Client {
2310                            stream: circ
2311                                .begin_stream("www.example.com", 80, None)
2312                                .await
2313                                .unwrap(),
2314                            to_write: &[0_u8; N_BYTES][..],
2315                        });
2316                    }
2317                    while let Some(mut client) = clients.pop_front() {
2318                        if client.to_write.is_empty() {
2319                            // Client is done. Don't put back in queue.
2320                            continue;
2321                        }
2322                        let written = client.stream.write(client.to_write).await.unwrap();
2323                        client.to_write = &client.to_write[written..];
2324                        clients.push_back(client);
2325                    }
2326                }
2327            })
2328            .unwrap();
2329
2330            let channel_handler_fut = async {
2331                let mut stream_bytes_received = HashMap::<StreamId, usize>::new();
2332                let mut total_bytes_received = 0;
2333
2334                loop {
2335                    let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2336                    let rmsg = match msg {
2337                        AnyChanMsg::Relay(r) => AnyRelayMsgOuter::decode_singleton(
2338                            RelayCellFormat::V0,
2339                            r.into_relay_body(),
2340                        )
2341                        .unwrap(),
2342                        other => panic!("Unexpected chanmsg: {other:?}"),
2343                    };
2344                    let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2345                    match rmsg.cmd() {
2346                        RelayCmd::BEGIN => {
2347                            // Add an entry for this stream.
2348                            let prev = stream_bytes_received.insert(streamid.unwrap(), 0);
2349                            assert_eq!(prev, None);
2350                            // Reply with a CONNECTED.
2351                            let connected = relaymsg::Connected::new_with_addr(
2352                                "10.0.0.1".parse().unwrap(),
2353                                1234,
2354                            )
2355                            .into();
2356                            sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2357                        }
2358                        RelayCmd::DATA => {
2359                            let data_msg = relaymsg::Data::try_from(rmsg).unwrap();
2360                            let nbytes = data_msg.as_ref().len();
2361                            total_bytes_received += nbytes;
2362                            let streamid = streamid.unwrap();
2363                            let stream_bytes = stream_bytes_received.get_mut(&streamid).unwrap();
2364                            *stream_bytes += nbytes;
2365                            if total_bytes_received >= N_BYTES {
2366                                break;
2367                            }
2368                        }
2369                        RelayCmd::END => {
2370                            // Stream is done. If fair scheduling is working as
2371                            // expected we *probably* shouldn't get here, but we
2372                            // can ignore it and save the failure until we
2373                            // actually have the final stats.
2374                            continue;
2375                        }
2376                        other => {
2377                            panic!("Unexpected command {other:?}");
2378                        }
2379                    }
2380                }
2381
2382                // Return our stats, along with the `rx` and `sink` to keep the
2383                // reactor alive (since clients could still be writing).
2384                (total_bytes_received, stream_bytes_received, rx, sink)
2385            };
2386
2387            let (total_bytes_received, stream_bytes_received, _rx, _sink) =
2388                channel_handler_fut.await;
2389            assert_eq!(stream_bytes_received.len(), N_STREAMS);
2390            for (sid, stream_bytes) in stream_bytes_received {
2391                assert!(
2392                    stream_bytes >= MIN_EXPECTED_BYTES_PER_STREAM,
2393                    "Only {stream_bytes} of {total_bytes_received} bytes received from {N_STREAMS} came from {sid:?}; expected at least {MIN_EXPECTED_BYTES_PER_STREAM}"
2394                );
2395            }
2396        });
2397    }
2398
2399    #[test]
2400    fn basic_params() {
2401        use super::CircParameters;
2402        let mut p = CircParameters::default();
2403        assert!(p.extend_by_ed25519_id);
2404
2405        p.extend_by_ed25519_id = false;
2406        assert!(!p.extend_by_ed25519_id);
2407    }
2408
2409    #[cfg(feature = "hs-service")]
2410    struct AllowAllStreamsFilter;
2411    #[cfg(feature = "hs-service")]
2412    impl IncomingStreamRequestFilter for AllowAllStreamsFilter {
2413        fn disposition(
2414            &mut self,
2415            _ctx: &crate::stream::IncomingStreamRequestContext<'_>,
2416            _circ: &crate::tunnel::reactor::syncview::ClientCircSyncView<'_>,
2417        ) -> Result<crate::stream::IncomingStreamRequestDisposition> {
2418            Ok(crate::stream::IncomingStreamRequestDisposition::Accept)
2419        }
2420    }
2421
2422    #[traced_test]
2423    #[test]
2424    #[cfg(feature = "hs-service")]
2425    fn allow_stream_requests_twice() {
2426        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2427            let (chan, _rx, _sink) = working_fake_channel(&rt);
2428            let (circ, _send) = newcirc(&rt, chan).await;
2429
2430            let _incoming = circ
2431                .allow_stream_requests(
2432                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2433                    circ.last_hop_num().unwrap(),
2434                    AllowAllStreamsFilter,
2435                )
2436                .await
2437                .unwrap();
2438
2439            let incoming = circ
2440                .allow_stream_requests(
2441                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2442                    circ.last_hop_num().unwrap(),
2443                    AllowAllStreamsFilter,
2444                )
2445                .await;
2446
2447            // There can only be one IncomingStream at a time on any given circuit.
2448            assert!(incoming.is_err());
2449        });
2450    }
2451
2452    #[traced_test]
2453    #[test]
2454    #[cfg(feature = "hs-service")]
2455    fn allow_stream_requests() {
2456        use tor_cell::relaycell::msg::BeginFlags;
2457
2458        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2459            const TEST_DATA: &[u8] = b"ping";
2460
2461            let (chan, _rx, _sink) = working_fake_channel(&rt);
2462            let (circ, mut send) = newcirc(&rt, chan).await;
2463
2464            let rfmt = RelayCellFormat::V0;
2465
2466            // A helper channel for coordinating the "client"/"service" interaction
2467            let (tx, rx) = oneshot::channel();
2468            let mut incoming = circ
2469                .allow_stream_requests(
2470                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2471                    circ.last_hop_num().unwrap(),
2472                    AllowAllStreamsFilter,
2473                )
2474                .await
2475                .unwrap();
2476
2477            let simulate_service = async move {
2478                let stream = incoming.next().await.unwrap();
2479                let mut data_stream = stream
2480                    .accept_data(relaymsg::Connected::new_empty())
2481                    .await
2482                    .unwrap();
2483                // Notify the client task we're ready to accept DATA cells
2484                tx.send(()).unwrap();
2485
2486                // Read the data the client sent us
2487                let mut buf = [0_u8; TEST_DATA.len()];
2488                data_stream.read_exact(&mut buf).await.unwrap();
2489                assert_eq!(&buf, TEST_DATA);
2490
2491                circ
2492            };
2493
2494            let simulate_client = async move {
2495                let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2496                let body: BoxedCellBody =
2497                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2498                        .encode(rfmt, &mut testing_rng())
2499                        .unwrap();
2500                let begin_msg = chanmsg::Relay::from(body);
2501
2502                // Pretend to be a client at the other end of the circuit sending a begin cell
2503                send.send(ClientCircChanMsg::Relay(begin_msg))
2504                    .await
2505                    .unwrap();
2506
2507                // Wait until the service is ready to accept data
2508                // TODO: we shouldn't need to wait! This is needed because the service will reject
2509                // any DATA cells that aren't associated with a known stream. We need to wait until
2510                // the service receives our BEGIN cell (and the reactor updates hop.map with the
2511                // new stream).
2512                rx.await.unwrap();
2513                // Now send some data along the newly established circuit..
2514                let data = relaymsg::Data::new(TEST_DATA).unwrap();
2515                let body: BoxedCellBody =
2516                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2517                        .encode(rfmt, &mut testing_rng())
2518                        .unwrap();
2519                let data_msg = chanmsg::Relay::from(body);
2520
2521                send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
2522                send
2523            };
2524
2525            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2526        });
2527    }
2528
2529    #[traced_test]
2530    #[test]
2531    #[cfg(feature = "hs-service")]
2532    fn accept_stream_after_reject() {
2533        use tor_cell::relaycell::msg::BeginFlags;
2534        use tor_cell::relaycell::msg::EndReason;
2535
2536        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2537            const TEST_DATA: &[u8] = b"ping";
2538            const STREAM_COUNT: usize = 2;
2539            let rfmt = RelayCellFormat::V0;
2540
2541            let (chan, _rx, _sink) = working_fake_channel(&rt);
2542            let (circ, mut send) = newcirc(&rt, chan).await;
2543
2544            // A helper channel for coordinating the "client"/"service" interaction
2545            let (mut tx, mut rx) = mpsc::channel(STREAM_COUNT);
2546
2547            let mut incoming = circ
2548                .allow_stream_requests(
2549                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2550                    circ.last_hop_num().unwrap(),
2551                    AllowAllStreamsFilter,
2552                )
2553                .await
2554                .unwrap();
2555
2556            let simulate_service = async move {
2557                // Process 2 incoming streams
2558                for i in 0..STREAM_COUNT {
2559                    let stream = incoming.next().await.unwrap();
2560
2561                    // Reject the first one
2562                    if i == 0 {
2563                        stream
2564                            .reject(relaymsg::End::new_with_reason(EndReason::INTERNAL))
2565                            .await
2566                            .unwrap();
2567                        // Notify the client
2568                        tx.send(()).await.unwrap();
2569                        continue;
2570                    }
2571
2572                    let mut data_stream = stream
2573                        .accept_data(relaymsg::Connected::new_empty())
2574                        .await
2575                        .unwrap();
2576                    // Notify the client task we're ready to accept DATA cells
2577                    tx.send(()).await.unwrap();
2578
2579                    // Read the data the client sent us
2580                    let mut buf = [0_u8; TEST_DATA.len()];
2581                    data_stream.read_exact(&mut buf).await.unwrap();
2582                    assert_eq!(&buf, TEST_DATA);
2583                }
2584
2585                circ
2586            };
2587
2588            let simulate_client = async move {
2589                let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2590                let body: BoxedCellBody =
2591                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2592                        .encode(rfmt, &mut testing_rng())
2593                        .unwrap();
2594                let begin_msg = chanmsg::Relay::from(body);
2595
2596                // Pretend to be a client at the other end of the circuit sending 2 identical begin
2597                // cells (the first one will be rejected by the test service).
2598                for _ in 0..STREAM_COUNT {
2599                    send.send(ClientCircChanMsg::Relay(begin_msg.clone()))
2600                        .await
2601                        .unwrap();
2602
2603                    // Wait until the service rejects our request
2604                    rx.next().await.unwrap();
2605                }
2606
2607                // Now send some data along the newly established circuit..
2608                let data = relaymsg::Data::new(TEST_DATA).unwrap();
2609                let body: BoxedCellBody =
2610                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2611                        .encode(rfmt, &mut testing_rng())
2612                        .unwrap();
2613                let data_msg = chanmsg::Relay::from(body);
2614
2615                send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
2616                send
2617            };
2618
2619            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2620        });
2621    }
2622
2623    #[traced_test]
2624    #[test]
2625    #[cfg(feature = "hs-service")]
2626    fn incoming_stream_bad_hop() {
2627        use tor_cell::relaycell::msg::BeginFlags;
2628
2629        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2630            /// Expect the originator of the BEGIN cell to be hop 1.
2631            const EXPECTED_HOP: u8 = 1;
2632            let rfmt = RelayCellFormat::V0;
2633
2634            let (chan, _rx, _sink) = working_fake_channel(&rt);
2635            let (circ, mut send) = newcirc(&rt, chan).await;
2636
2637            // Expect to receive incoming streams from hop EXPECTED_HOP
2638            let mut incoming = circ
2639                .allow_stream_requests(
2640                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2641                    EXPECTED_HOP.into(),
2642                    AllowAllStreamsFilter,
2643                )
2644                .await
2645                .unwrap();
2646
2647            let simulate_service = async move {
2648                // The originator of the cell is actually the last hop on the circuit, not hop 1,
2649                // so we expect the reactor to shut down.
2650                assert!(incoming.next().await.is_none());
2651                circ
2652            };
2653
2654            let simulate_client = async move {
2655                let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2656                let body: BoxedCellBody =
2657                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2658                        .encode(rfmt, &mut testing_rng())
2659                        .unwrap();
2660                let begin_msg = chanmsg::Relay::from(body);
2661
2662                // Pretend to be a client at the other end of the circuit sending a begin cell
2663                send.send(ClientCircChanMsg::Relay(begin_msg))
2664                    .await
2665                    .unwrap();
2666
2667                send
2668            };
2669
2670            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2671        });
2672    }
2673}