tor_proto/tunnel/
circuit.rs

1//! Multi-hop paths over the Tor network.
2//!
3//! Right now, we only implement "client circuits" -- also sometimes
4//! called "origin circuits".  A client circuit is one that is
5//! constructed by this Tor instance, and used in its own behalf to
6//! send data over the Tor network.
7//!
8//! Each circuit has multiple hops over the Tor network: each hop
9//! knows only the hop before and the hop after.  The client shares a
10//! separate set of keys with each hop.
11//!
12//! To build a circuit, first create a [crate::channel::Channel], then
13//! call its [crate::channel::Channel::new_circ] method.  This yields
14//! a [PendingClientCirc] object that won't become live until you call
15//! one of the methods
16//! (typically [`PendingClientCirc::create_firsthop`])
17//! that extends it to its first hop.  After you've
18//! done that, you can call [`ClientCirc::extend`] on the circuit to
19//! build it into a multi-hop circuit.  Finally, you can use
20//! [ClientCirc::begin_stream] to get a Stream object that can be used
21//! for anonymized data.
22//!
23//! # Implementation
24//!
25//! Each open circuit has a corresponding Reactor object that runs in
26//! an asynchronous task, and manages incoming cells from the
27//! circuit's upstream channel.  These cells are either RELAY cells or
28//! DESTROY cells.  DESTROY cells are handled immediately.
29//! RELAY cells are either for a particular stream, in which case they
30//! get forwarded to a RawCellStream object, or for no particular stream,
31//! in which case they are considered "meta" cells (like EXTENDED2)
32//! that should only get accepted if something is waiting for them.
33//!
34//! # Limitations
35//!
36//! This is client-only.
37
38pub(crate) mod celltypes;
39pub(crate) mod halfcirc;
40
41#[cfg(feature = "hs-common")]
42pub mod handshake;
43#[cfg(not(feature = "hs-common"))]
44pub(crate) mod handshake;
45
46pub(super) mod path;
47pub(crate) mod unique_id;
48
49use crate::channel::Channel;
50use crate::congestion::params::CongestionControlParams;
51use crate::crypto::cell::HopNum;
52use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
53use crate::memquota::{CircuitAccount, SpecificAccount as _};
54use crate::stream::{
55    AnyCmdChecker, DataCmdChecker, DataStream, ResolveCmdChecker, ResolveStream, StreamParameters,
56    StreamRateLimit, StreamReceiver,
57};
58use crate::tunnel::circuit::celltypes::*;
59use crate::tunnel::reactor::CtrlCmd;
60use crate::tunnel::reactor::{
61    CircuitHandshake, CtrlMsg, Reactor, RECV_WINDOW_INIT, STREAM_READER_BUFFER,
62};
63use crate::tunnel::{StreamTarget, TargetHop};
64use crate::util::skew::ClockSkew;
65use crate::{Error, ResolveError, Result};
66use educe::Educe;
67use path::HopDetail;
68use postage::watch;
69use tor_cell::{
70    chancell::CircId,
71    relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal},
72};
73
74use tor_error::{bad_api_usage, internal, into_internal};
75use tor_linkspec::{CircTarget, LinkSpecType, OwnedChanTarget, RelayIdType};
76use tor_protover::named;
77
78pub use crate::crypto::binding::CircuitBinding;
79pub use crate::memquota::StreamAccount;
80pub use crate::tunnel::circuit::unique_id::UniqId;
81
82#[cfg(feature = "hs-service")]
83use {
84    crate::stream::{IncomingCmdChecker, IncomingStream},
85    crate::tunnel::reactor::StreamReqInfo,
86};
87
88use futures::channel::mpsc;
89use oneshot_fused_workaround as oneshot;
90
91use crate::congestion::sendme::StreamRecvWindow;
92use crate::DynTimeProvider;
93use futures::FutureExt as _;
94use std::collections::HashMap;
95use std::net::IpAddr;
96use std::sync::{Arc, Mutex};
97use tor_memquota::mq_queue::{self, ChannelSpec as _, MpscSpec};
98
99use crate::crypto::handshake::ntor::NtorPublicKey;
100
101pub use path::{Path, PathEntry};
102
103/// The size of the buffer for communication between `ClientCirc` and its reactor.
104pub const CIRCUIT_BUFFER_SIZE: usize = 128;
105
106#[cfg(feature = "send-control-msg")]
107use {crate::tunnel::msghandler::UserMsgHandler, crate::tunnel::reactor::MetaCellHandler};
108
109pub use crate::tunnel::reactor::syncview::ClientCircSyncView;
110#[cfg(feature = "send-control-msg")]
111#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
112pub use {crate::tunnel::msghandler::MsgHandler, crate::tunnel::reactor::MetaCellDisposition};
113
114/// MPSC queue relating to a stream (either inbound or outbound), sender
115pub(crate) type StreamMpscSender<T> = mq_queue::Sender<T, MpscSpec>;
116/// MPSC queue relating to a stream (either inbound or outbound), receiver
117pub(crate) type StreamMpscReceiver<T> = mq_queue::Receiver<T, MpscSpec>;
118
119/// MPSC queue for inbound data on its way from channel to circuit, sender
120pub(crate) type CircuitRxSender = mq_queue::Sender<ClientCircChanMsg, MpscSpec>;
121/// MPSC queue for inbound data on its way from channel to circuit, receiver
122pub(crate) type CircuitRxReceiver = mq_queue::Receiver<ClientCircChanMsg, MpscSpec>;
123
124#[derive(Debug)]
125/// A circuit that we have constructed over the Tor network.
126///
127/// # Circuit life cycle
128///
129/// `ClientCirc`s are created in an initially unusable state using [`Channel::new_circ`],
130/// which returns a [`PendingClientCirc`].  To get a real (one-hop) circuit from
131/// one of these, you invoke one of its `create_firsthop` methods (typically
132/// [`create_firsthop_fast()`](PendingClientCirc::create_firsthop_fast) or
133/// [`create_firsthop()`](PendingClientCirc::create_firsthop)).
134/// Then, to add more hops to the circuit, you can call
135/// [`extend()`](ClientCirc::extend) on it.
136///
137/// For higher-level APIs, see the `tor-circmgr` crate: the ones here in
138/// `tor-proto` are probably not what you need.
139///
140/// After a circuit is created, it will persist until it is closed in one of
141/// five ways:
142///    1. A remote error occurs.
143///    2. Some hop on the circuit sends a `DESTROY` message to tear down the
144///       circuit.
145///    3. The circuit's channel is closed.
146///    4. Someone calls [`ClientCirc::terminate`] on the circuit.
147///    5. The last reference to the `ClientCirc` is dropped. (Note that every stream
148///       on a `ClientCirc` keeps a reference to it, which will in turn keep the
149///       circuit from closing until all those streams have gone away.)
150///
151/// Note that in cases 1-4 the [`ClientCirc`] object itself will still exist: it
152/// will just be unusable for most purposes.  Most operations on it will fail
153/// with an error.
154//
155// Effectively, this struct contains two Arcs: one for `path` and one for
156// `control` (which surely has something Arc-like in it).  We cannot unify
157// these by putting a single Arc around the whole struct, and passing
158// an Arc strong reference to the `Reactor`, because then `control` would
159// not be dropped when the last user of the circuit goes away.  We could
160// make the reactor have a weak reference but weak references are more
161// expensive to dereference.
162//
163// Because of the above, cloning this struct is always going to involve
164// two atomic refcount changes/checks.  Wrapping it in another Arc would
165// be overkill.
166//
167pub struct ClientCirc {
168    /// Mutable state shared with the `Reactor`.
169    mutable: Arc<TunnelMutableState>,
170    /// A unique identifier for this circuit.
171    unique_id: UniqId,
172    /// Channel to send control messages to the reactor.
173    pub(super) control: mpsc::UnboundedSender<CtrlMsg>,
174    /// Channel to send commands to the reactor.
175    pub(super) command: mpsc::UnboundedSender<CtrlCmd>,
176    /// A future that resolves to Cancelled once the reactor is shut down,
177    /// meaning that the circuit is closed.
178    #[cfg_attr(not(feature = "experimental-api"), allow(dead_code))]
179    reactor_closed_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
180    /// For testing purposes: the CircId, for use in peek_circid().
181    #[cfg(test)]
182    circid: CircId,
183    /// Memory quota account
184    memquota: CircuitAccount,
185    /// Time provider
186    time_provider: DynTimeProvider,
187}
188
189/// The mutable state of a tunnel, shared between [`ClientCirc`] and [`Reactor`].
190///
191/// NOTE(gabi): this mutex-inside-a-mutex might look suspicious,
192/// but it is currently the best option we have for sharing
193/// the circuit state with `ClientCirc` (and soon, with `ClientTunnel`).
194/// In practice, these mutexes won't be accessed very often
195/// (they're accessed for writing when a circuit is extended,
196/// and for reading by the various `ClientCirc` APIs),
197/// so they shouldn't really impact performance.
198///
199/// Alternatively, the circuit state information could be shared
200/// outside the reactor through a channel (passed to the reactor via a `CtrlCmd`),
201/// but in #1840 @opara notes that involves making the `ClientCirc` accessors
202/// (`ClientCirc::path`, `ClientCirc::binding_key`, etc.)
203/// asynchronous, which will significantly complicate their callsites,
204/// which would in turn need to be made async too.
205///
206/// We should revisit this decision at some point, and decide whether an async API
207/// would be preferable.
208#[derive(Debug, Default)]
209pub(super) struct TunnelMutableState(Mutex<HashMap<UniqId, Arc<MutableState>>>);
210
211impl TunnelMutableState {
212    /// Add the [`MutableState`] of a circuit.
213    pub(super) fn insert(&self, unique_id: UniqId, mutable: Arc<MutableState>) {
214        #[allow(unused)] // unused in non-debug builds
215        let state = self
216            .0
217            .lock()
218            .expect("lock poisoned")
219            .insert(unique_id, mutable);
220
221        debug_assert!(state.is_none());
222    }
223
224    /// Remove the [`MutableState`] of a circuit.
225    pub(super) fn remove(&self, unique_id: UniqId) {
226        #[allow(unused)] // unused in non-debug builds
227        let state = self.0.lock().expect("lock poisoned").remove(&unique_id);
228
229        debug_assert!(state.is_some());
230    }
231
232    /// Return a [`Path`] object describing all the hops in the specified circuit.
233    ///
234    /// See [`MutableState::path`].
235    fn path_ref(&self, unique_id: UniqId) -> Result<Arc<Path>> {
236        let lock = self.0.lock().expect("lock poisoned");
237        let mutable = lock
238            .get(&unique_id)
239            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
240
241        Ok(mutable.path())
242    }
243
244    /// Return a description of the first hop of this circuit.
245    ///
246    /// Returns an error if a circuit with the specified [`UniqId`] doesn't exist.
247    /// Returns `Ok(None)` if the specified circuit doesn't have any hops.
248    fn first_hop(&self, unique_id: UniqId) -> Result<Option<OwnedChanTarget>> {
249        let lock = self.0.lock().expect("lock poisoned");
250        let mutable = lock
251            .get(&unique_id)
252            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
253
254        let first_hop = mutable.first_hop().map(|first_hop| match first_hop {
255            path::HopDetail::Relay(r) => r,
256            #[cfg(feature = "hs-common")]
257            path::HopDetail::Virtual => {
258                panic!("somehow made a circuit with a virtual first hop.")
259            }
260        });
261
262        Ok(first_hop)
263    }
264
265    /// Return the [`HopNum`] of the last hop of the specified circuit.
266    ///
267    /// Returns an error if a circuit with the specified [`UniqId`] doesn't exist.
268    ///
269    /// See [`MutableState::last_hop_num`].
270    fn last_hop_num(&self, unique_id: UniqId) -> Result<Option<HopNum>> {
271        let lock = self.0.lock().expect("lock poisoned");
272        let mutable = lock
273            .get(&unique_id)
274            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
275
276        Ok(mutable.last_hop_num())
277    }
278
279    /// Return the number of hops in the specified circuit.
280    ///
281    /// See [`MutableState::n_hops`].
282    fn n_hops(&self, unique_id: UniqId) -> Result<usize> {
283        let lock = self.0.lock().expect("lock poisoned");
284        let mutable = lock
285            .get(&unique_id)
286            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
287
288        Ok(mutable.n_hops())
289    }
290
291    /// Return the number of legs in this tunnel.
292    ///
293    /// TODO(conflux-fork): this can be removed once we modify `path_ref`
294    /// to return *all* the Paths in the tunnel.
295    fn n_legs(&self) -> usize {
296        let lock = self.0.lock().expect("lock poisoned");
297        lock.len()
298    }
299}
300
301/// The mutable state of a circuit.
302#[derive(Educe, Default)]
303#[educe(Debug)]
304pub(super) struct MutableState(Mutex<CircuitState>);
305
306impl MutableState {
307    /// Add a hop to the path of this circuit.
308    pub(super) fn add_hop(&self, peer_id: HopDetail, binding: Option<CircuitBinding>) {
309        let mut mutable = self.0.lock().expect("poisoned lock");
310        Arc::make_mut(&mut mutable.path).push_hop(peer_id);
311        mutable.binding.push(binding);
312    }
313
314    /// Get a copy of the circuit's current [`path::Path`].
315    pub(super) fn path(&self) -> Arc<path::Path> {
316        let mutable = self.0.lock().expect("poisoned lock");
317        Arc::clone(&mutable.path)
318    }
319
320    /// Return the cryptographic material used to prove knowledge of a shared
321    /// secret with with `hop`.
322    pub(super) fn binding_key(&self, hop: HopNum) -> Option<CircuitBinding> {
323        let mutable = self.0.lock().expect("poisoned lock");
324
325        mutable.binding.get::<usize>(hop.into()).cloned().flatten()
326        // NOTE: I'm not thrilled to have to copy this information, but we use
327        // it very rarely, so it's not _that_ bad IMO.
328    }
329
330    /// Return a description of the first hop of this circuit.
331    fn first_hop(&self) -> Option<HopDetail> {
332        let mutable = self.0.lock().expect("poisoned lock");
333        mutable.path.first_hop()
334    }
335
336    /// Return the [`HopNum`] of the last hop of this circuit.
337    ///
338    /// NOTE: This function will return the [`HopNum`] of the hop
339    /// that is _currently_ the last. If there is an extend operation in progress,
340    /// the currently pending hop may or may not be counted, depending on whether
341    /// the extend operation finishes before this call is done.
342    fn last_hop_num(&self) -> Option<HopNum> {
343        let mutable = self.0.lock().expect("poisoned lock");
344        mutable.path.last_hop_num()
345    }
346
347    /// Return the number of hops in this circuit.
348    ///
349    /// NOTE: This function will currently return only the number of hops
350    /// _currently_ in the circuit. If there is an extend operation in progress,
351    /// the currently pending hop may or may not be counted, depending on whether
352    /// the extend operation finishes before this call is done.
353    fn n_hops(&self) -> usize {
354        let mutable = self.0.lock().expect("poisoned lock");
355        mutable.path.n_hops()
356    }
357}
358
359/// The shared state of a circuit.
360#[derive(Educe, Default)]
361#[educe(Debug)]
362pub(super) struct CircuitState {
363    /// Information about this circuit's path.
364    ///
365    /// This is stored in an Arc so that we can cheaply give a copy of it to
366    /// client code; when we need to add a hop (which is less frequent) we use
367    /// [`Arc::make_mut()`].
368    path: Arc<path::Path>,
369
370    /// Circuit binding keys [q.v.][`CircuitBinding`] information for each hop
371    /// in the circuit's path.
372    ///
373    /// NOTE: Right now, there is a `CircuitBinding` for every hop.  There's a
374    /// fair chance that this will change in the future, and I don't want other
375    /// code to assume that a `CircuitBinding` _must_ exist, so I'm making this
376    /// an `Option`.
377    #[educe(Debug(ignore))]
378    binding: Vec<Option<CircuitBinding>>,
379}
380
381/// A ClientCirc that needs to send a create cell and receive a created* cell.
382///
383/// To use one of these, call `create_firsthop_fast()` or `create_firsthop()`
384/// to negotiate the cryptographic handshake with the first hop.
385pub struct PendingClientCirc {
386    /// A oneshot receiver on which we'll receive a CREATED* cell,
387    /// or a DESTROY cell.
388    recvcreated: oneshot::Receiver<CreateResponse>,
389    /// The ClientCirc object that we can expose on success.
390    circ: Arc<ClientCirc>,
391}
392
393/// Description of the network's current rules for building circuits.
394///
395/// This type describes rules derived from the consensus,
396/// and possibly amended by our own configuration.
397///
398/// Typically, this type created once for an entire circuit,
399/// and any special per-hop information is derived
400/// from each hop as a CircTarget.
401/// Note however that callers _may_ provide different `CircParameters`
402/// for different hops within a circuit if they have some reason to do so,
403/// so we do not enforce that every hop in a circuit has the same `CircParameters`.
404#[non_exhaustive]
405#[derive(Clone, Debug)]
406pub struct CircParameters {
407    /// Whether we should include ed25519 identities when we send
408    /// EXTEND2 cells.
409    pub extend_by_ed25519_id: bool,
410    /// Congestion control parameters for this circuit.
411    pub ccontrol: CongestionControlParams,
412
413    /// Maximum number of permitted incoming relay cells for each hop.
414    ///
415    /// If we would receive more relay cells than this from a single hop,
416    /// we close the circuit with [`ExcessInboundCells`](Error::ExcessInboundCells).
417    ///
418    /// If this value is None, then there is no limit to the number of inbound cells.
419    ///
420    /// Known limitation: If this value if `u32::MAX`,
421    /// then a limit of `u32::MAX - 1` is enforced.
422    pub n_incoming_cells_permitted: Option<u32>,
423
424    /// Maximum number of permitted outgoing relay cells for each hop.
425    ///
426    /// If we would try to send more relay cells than this from a single hop,
427    /// we close the circuit with [`ExcessOutboundCells`](Error::ExcessOutboundCells).
428    /// It is the circuit-user's responsibility to make sure that this does not happen.
429    ///
430    /// This setting is used to ensure that we do not violate a limit
431    /// imposed by `n_incoming_cells_permitted`
432    /// on the other side of a circuit.
433    ///
434    /// If this value is None, then there is no limit to the number of outbound cells.
435    ///
436    /// Known limitation: If this value if `u32::MAX`,
437    /// then a limit of `u32::MAX - 1` is enforced.
438    pub n_outgoing_cells_permitted: Option<u32>,
439}
440
441/// The settings we use for single hop of a circuit.
442///
443/// Unlike [`CircParameters`], this type is crate-internal.
444/// We construct it based on our settings from the circuit,
445/// and from the hop's actual capabilities.
446/// Then, we negotiate with the hop as part of circuit
447/// creation/extension to determine the actual settings that will be in use.
448/// Finally, we use those settings to construct the negotiated circuit hop.
449//
450// TODO: Relays should probably derive an instance of this type too, as
451// part of the circuit creation handshake.
452#[derive(Clone, Debug)]
453pub(super) struct HopSettings {
454    /// The negotiated congestion control settings for this circuit.
455    pub(super) ccontrol: CongestionControlParams,
456
457    /// Maximum number of permitted incoming relay cells for this hop.
458    pub(super) n_incoming_cells_permitted: Option<u32>,
459
460    /// Maximum number of permitted outgoing relay cells for this hop.
461    pub(super) n_outgoing_cells_permitted: Option<u32>,
462}
463
464impl HopSettings {
465    /// Construct a new `HopSettings` based on `params` (a set of circuit parameters)
466    /// and `caps` (a set of protocol capabilities for a circuit target).
467    ///
468    /// The resulting settings will represent what the client would prefer to negotiate
469    /// (determined by `params`),
470    /// as modified by what the target relay is believed to support (represented by `caps`).
471    ///
472    /// This represents the `HopSettings` in a pre-negotiation state:
473    /// the circuit negotiation process will modify it.
474    #[allow(clippy::unnecessary_wraps)] // likely to become fallible in the future.
475    pub(super) fn from_params_and_caps(
476        params: &CircParameters,
477        caps: &tor_protover::Protocols,
478    ) -> Result<Self> {
479        let mut settings = Self {
480            ccontrol: params.ccontrol.clone(),
481            n_incoming_cells_permitted: params.n_incoming_cells_permitted,
482            n_outgoing_cells_permitted: params.n_outgoing_cells_permitted,
483        };
484
485        match settings.ccontrol.alg() {
486            crate::ccparams::Algorithm::FixedWindow(_) => {}
487            crate::ccparams::Algorithm::Vegas(_) => {
488                // If the target doesn't support FLOWCTRL_CC, we can't use Vegas.
489                if !caps.supports_named_subver(named::FLOWCTRL_CC) {
490                    settings.ccontrol.use_fallback_alg();
491                }
492            }
493        }
494
495        Ok(settings)
496    }
497
498    /// Return a new `HopSettings` based on this one,
499    /// representing the settings that we should use
500    /// if circuit negotiation will be impossible.
501    ///
502    /// (Circuit negotiation is impossible when using the legacy ntor protocol,
503    /// and when using CRATE_FAST.  It is currently unsupported with virtual hops.)
504    pub(super) fn without_negotiation(mut self) -> Self {
505        self.ccontrol.use_fallback_alg();
506        self
507    }
508}
509
510#[cfg(test)]
511impl std::default::Default for CircParameters {
512    fn default() -> Self {
513        Self {
514            extend_by_ed25519_id: true,
515            ccontrol: crate::congestion::test_utils::params::build_cc_fixed_params(),
516            n_incoming_cells_permitted: None,
517            n_outgoing_cells_permitted: None,
518        }
519    }
520}
521
522impl CircParameters {
523    /// Constructor
524    pub fn new(extend_by_ed25519_id: bool, ccontrol: CongestionControlParams) -> Self {
525        Self {
526            extend_by_ed25519_id,
527            ccontrol,
528            n_incoming_cells_permitted: None,
529            n_outgoing_cells_permitted: None,
530        }
531    }
532}
533
534impl ClientCirc {
535    /// Return a description of the first hop of this circuit.
536    ///
537    /// # Panics
538    ///
539    /// Panics if there is no first hop.  (This should be impossible outside of
540    /// the tor-proto crate, but within the crate it's possible to have a
541    /// circuit with no hops.)
542    pub fn first_hop(&self) -> Result<OwnedChanTarget> {
543        Ok(self
544            .mutable
545            .first_hop(self.unique_id)
546            .map_err(|_| Error::CircuitClosed)?
547            .expect("called first_hop on an un-constructed circuit"))
548    }
549
550    /// Return a description of the last hop of the circuit.
551    ///
552    /// Return None if the last hop is virtual.
553    ///
554    /// See caveats on [`ClientCirc::last_hop_num()`].
555    ///
556    /// # Panics
557    ///
558    /// Panics if there is no last hop.  (This should be impossible outside of
559    /// the tor-proto crate, but within the crate it's possible to have a
560    /// circuit with no hops.)
561    pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
562        let path = self.path_ref()?;
563        Ok(path
564            .hops()
565            .last()
566            .expect("Called last_hop an an un-constructed circuit")
567            .as_chan_target()
568            .map(OwnedChanTarget::from_chan_target))
569    }
570
571    /// Return the [`HopNum`] of the last hop of this circuit.
572    ///
573    /// Returns an error if there is no last hop.  (This should be impossible outside of the
574    /// tor-proto crate, but within the crate it's possible to have a circuit with no hops.)
575    ///
576    /// NOTE: This function will return the [`HopNum`] of the hop
577    /// that is _currently_ the last. If there is an extend operation in progress,
578    /// the currently pending hop may or may not be counted, depending on whether
579    /// the extend operation finishes before this call is done.
580    pub fn last_hop_num(&self) -> Result<HopNum> {
581        Ok(self
582            .mutable
583            .last_hop_num(self.unique_id)?
584            .ok_or_else(|| internal!("no last hop index"))?)
585    }
586
587    /// Return a [`TargetHop`] representing precisely the last hop of the circuit as in set as a
588    /// HopLocation with its id and hop number.
589    ///
590    /// Return an error if there is no last hop.
591    pub fn last_hop(&self) -> Result<TargetHop> {
592        let hop_num = self
593            .mutable
594            .last_hop_num(self.unique_id)?
595            .ok_or_else(|| bad_api_usage!("no last hop"))?;
596        Ok((self.unique_id, hop_num).into())
597    }
598
599    /// Return a [`Path`] object describing all the hops in this circuit.
600    ///
601    /// Note that this `Path` is not automatically updated if the circuit is
602    /// extended.
603    pub fn path_ref(&self) -> Result<Arc<Path>> {
604        self.mutable
605            .path_ref(self.unique_id)
606            .map_err(|_| Error::CircuitClosed)
607    }
608
609    /// Get the clock skew claimed by the first hop of the circuit.
610    ///
611    /// See [`Channel::clock_skew()`].
612    pub async fn first_hop_clock_skew(&self) -> Result<ClockSkew> {
613        let (tx, rx) = oneshot::channel();
614
615        self.control
616            .unbounded_send(CtrlMsg::FirstHopClockSkew { answer: tx })
617            .map_err(|_| Error::CircuitClosed)?;
618
619        Ok(rx.await.map_err(|_| Error::CircuitClosed)??)
620    }
621
622    /// Return a reference to this circuit's memory quota account
623    pub fn mq_account(&self) -> &CircuitAccount {
624        &self.memquota
625    }
626
627    /// Return the cryptographic material used to prove knowledge of a shared
628    /// secret with with `hop`.
629    ///
630    /// See [`CircuitBinding`] for more information on how this is used.
631    ///
632    /// Return None if we have no circuit binding information for the hop, or if
633    /// the hop does not exist.
634    #[cfg(feature = "hs-service")]
635    pub async fn binding_key(&self, hop: TargetHop) -> Result<Option<CircuitBinding>> {
636        let (sender, receiver) = oneshot::channel();
637        let msg = CtrlCmd::GetBindingKey { hop, done: sender };
638        self.command
639            .unbounded_send(msg)
640            .map_err(|_| Error::CircuitClosed)?;
641
642        receiver.await.map_err(|_| Error::CircuitClosed)?
643    }
644
645    /// Start an ad-hoc protocol exchange to the specified hop on this circuit
646    ///
647    /// To use this:
648    ///
649    ///  0. Create an inter-task channel you'll use to receive
650    ///     the outcome of your conversation,
651    ///     and bundle it into a [`MsgHandler`].
652    ///
653    ///  1. Call `start_conversation`.
654    ///     This will install a your handler, for incoming messages,
655    ///     and send the outgoing message (if you provided one).
656    ///     After that, each message on the circuit
657    ///     that isn't handled by the core machinery
658    ///     is passed to your provided `reply_handler`.
659    ///
660    ///  2. Possibly call `send_msg` on the [`Conversation`],
661    ///     from the call site of `start_conversation`,
662    ///     possibly multiple times, from time to time,
663    ///     to send further desired messages to the peer.
664    ///
665    ///  3. In your [`MsgHandler`], process the incoming messages.
666    ///     You may respond by
667    ///     sending additional messages
668    ///     When the protocol exchange is finished,
669    ///     `MsgHandler::handle_msg` should return
670    ///     [`ConversationFinished`](MetaCellDisposition::ConversationFinished).
671    ///
672    /// If you don't need the `Conversation` to send followup messages,
673    /// you may simply drop it,
674    /// and rely on the responses you get from your handler,
675    /// on the channel from step 0 above.
676    /// Your handler will remain installed and able to process incoming messages
677    /// until it returns `ConversationFinished`.
678    ///
679    /// (If you don't want to accept any replies at all, it may be
680    /// simpler to use [`ClientCirc::send_raw_msg`].)
681    ///
682    /// Note that it is quite possible to use this function to violate the tor
683    /// protocol; most users of this API will not need to call it.  It is used
684    /// to implement most of the onion service handshake.
685    ///
686    /// # Limitations
687    ///
688    /// Only one conversation may be active at any one time,
689    /// for any one circuit.
690    /// This generally means that this function should not be called
691    /// on a circuit which might be shared with anyone else.
692    ///
693    /// Likewise, it is forbidden to try to extend the circuit,
694    /// while the conversation is in progress.
695    ///
696    /// After the conversation has finished, the circuit may be extended.
697    /// Or, `start_conversation` may be called again;
698    /// but, in that case there will be a gap between the two conversations,
699    /// during which no `MsgHandler` is installed,
700    /// and unexpected incoming messages would close the circuit.
701    ///
702    /// If these restrictions are violated, the circuit will be closed with an error.
703    ///
704    /// ## Precise definition of the lifetime of a conversation
705    ///
706    /// A conversation is in progress from entry to `start_conversation`,
707    /// until entry to the body of the [`MsgHandler::handle_msg`]
708    /// call which returns [`ConversationFinished`](MetaCellDisposition::ConversationFinished).
709    /// (*Entry* since `handle_msg` is synchronously embedded
710    /// into the incoming message processing.)
711    /// So you may start a new conversation as soon as you have the final response
712    /// via your inter-task channel from (0) above.
713    ///
714    /// The lifetime relationship of the [`Conversation`],
715    /// vs the handler returning `ConversationFinished`
716    /// is not enforced by the type system.
717    // Doing so without still leaving plenty of scope for runtime errors doesn't seem possible,
718    // at least while allowing sending followup messages from outside the handler.
719    //
720    // TODO hs: it might be nice to avoid exposing tor-cell APIs in the
721    //   tor-proto interface.
722    #[cfg(feature = "send-control-msg")]
723    pub async fn start_conversation(
724        &self,
725        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
726        reply_handler: impl MsgHandler + Send + 'static,
727        hop: TargetHop,
728    ) -> Result<Conversation<'_>> {
729        // We need to resolve the TargetHop into a precise HopLocation so our msg handler can match
730        // the right Leg/Hop with inbound cell.
731        let (sender, receiver) = oneshot::channel();
732        self.command
733            .unbounded_send(CtrlCmd::ResolveTargetHop { hop, done: sender })
734            .map_err(|_| Error::CircuitClosed)?;
735        let hop_location = receiver.await.map_err(|_| Error::CircuitClosed)??;
736        let handler = Box::new(UserMsgHandler::new(hop_location, reply_handler));
737        let conversation = Conversation(self);
738        conversation.send_internal(msg, Some(handler)).await?;
739        Ok(conversation)
740    }
741
742    /// Send an ad-hoc message to a given hop on the circuit, without expecting
743    /// a reply.
744    ///
745    /// (If you want to handle one or more possible replies, see
746    /// [`ClientCirc::start_conversation`].)
747    #[cfg(feature = "send-control-msg")]
748    pub async fn send_raw_msg(
749        &self,
750        msg: tor_cell::relaycell::msg::AnyRelayMsg,
751        hop: TargetHop,
752    ) -> Result<()> {
753        let (sender, receiver) = oneshot::channel();
754        let ctrl_msg = CtrlMsg::SendMsg { hop, msg, sender };
755        self.control
756            .unbounded_send(ctrl_msg)
757            .map_err(|_| Error::CircuitClosed)?;
758
759        receiver.await.map_err(|_| Error::CircuitClosed)?
760    }
761
762    /// Tell this circuit to begin allowing the final hop of the circuit to try
763    /// to create new Tor streams, and to return those pending requests in an
764    /// asynchronous stream.
765    ///
766    /// Ordinarily, these requests are rejected.
767    ///
768    /// There can only be one [`Stream`](futures::Stream) of this type created on a given circuit.
769    /// If a such a [`Stream`](futures::Stream) already exists, this method will return
770    /// an error.
771    ///
772    /// After this method has been called on a circuit, the circuit is expected
773    /// to receive requests of this type indefinitely, until it is finally closed.
774    /// If the `Stream` is dropped, the next request on this circuit will cause it to close.
775    ///
776    /// Only onion services (and eventually) exit relays should call this
777    /// method.
778    //
779    // TODO: Someday, we might want to allow a stream request handler to be
780    // un-registered.  However, nothing in the Tor protocol requires it.
781    #[cfg(feature = "hs-service")]
782    pub async fn allow_stream_requests(
783        self: &Arc<ClientCirc>,
784        allow_commands: &[tor_cell::relaycell::RelayCmd],
785        hop: TargetHop,
786        filter: impl crate::stream::IncomingStreamRequestFilter,
787    ) -> Result<impl futures::Stream<Item = IncomingStream>> {
788        use futures::stream::StreamExt;
789
790        /// The size of the channel receiving IncomingStreamRequestContexts.
791        const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
792
793        // TODO(#2002): support onion service conflux
794        let circ_count = self.mutable.n_legs();
795        if circ_count != 1 {
796            return Err(
797                internal!("Cannot allow stream requests on tunnel with {circ_count} legs",).into(),
798            );
799        }
800
801        let time_prov = self.time_provider.clone();
802        let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
803        let (incoming_sender, incoming_receiver) =
804            MpscSpec::new(INCOMING_BUFFER).new_mq(time_prov, self.memquota.as_raw_account())?;
805        let (tx, rx) = oneshot::channel();
806
807        self.command
808            .unbounded_send(CtrlCmd::AwaitStreamRequest {
809                cmd_checker,
810                incoming_sender,
811                hop,
812                done: tx,
813                filter: Box::new(filter),
814            })
815            .map_err(|_| Error::CircuitClosed)?;
816
817        // Check whether the AwaitStreamRequest was processed successfully.
818        rx.await.map_err(|_| Error::CircuitClosed)??;
819
820        let allowed_hop_loc = match hop {
821            TargetHop::Hop(loc) => Some(loc),
822            _ => None,
823        }
824        .ok_or_else(|| bad_api_usage!("Expected TargetHop with HopLocation"))?;
825
826        let circ = Arc::clone(self);
827        Ok(incoming_receiver.map(move |req_ctx| {
828            let StreamReqInfo {
829                req,
830                stream_id,
831                hop,
832                receiver,
833                msg_tx,
834                rate_limit_stream,
835                memquota,
836                relay_cell_format,
837            } = req_ctx;
838
839            // We already enforce this in handle_incoming_stream_request; this
840            // assertion is just here to make sure that we don't ever
841            // accidentally remove or fail to enforce that check, since it is
842            // security-critical.
843            assert_eq!(allowed_hop_loc, hop);
844
845            // TODO(#2002): figure out what this is going to look like
846            // for onion services (perhaps we should forbid this function
847            // from being called on a multipath circuit?)
848            //
849            // See also:
850            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3002#note_3200937
851            let target = StreamTarget {
852                circ: Arc::clone(&circ),
853                tx: msg_tx,
854                hop: allowed_hop_loc,
855                stream_id,
856                relay_cell_format,
857                rate_limit_stream,
858            };
859
860            let reader = StreamReceiver {
861                target: target.clone(),
862                receiver,
863                recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
864                ended: false,
865            };
866
867            IncomingStream::new(circ.time_provider.clone(), req, target, reader, memquota)
868        }))
869    }
870
871    /// Extend the circuit, via the most appropriate circuit extension handshake,
872    /// to the chosen `target` hop.
873    pub async fn extend<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
874    where
875        Tg: CircTarget,
876    {
877        // For now we use the simplest decision-making mechanism:
878        // we use ntor_v3 whenever it is present; and otherwise we use ntor.
879        //
880        // This behavior is slightly different from C tor, which uses ntor v3
881        // only whenever it want to send any extension in the circuit message.
882        // But thanks to congestion control (named::FLOWCTRL_CC), we'll _always_
883        // want to use an extension if we can, and so it doesn't make too much
884        // sense to detect the case where we have no extensions.
885        //
886        // (As of April 2025, RELAY_NTORV3 is not yet listed as Required for relays
887        // on the tor network, and so we cannot simply assume that everybody has it.)
888        if target
889            .protovers()
890            .supports_named_subver(named::RELAY_NTORV3)
891        {
892            self.extend_ntor_v3(target, params).await
893        } else {
894            self.extend_ntor(target, params).await
895        }
896    }
897
898    /// Extend the circuit via the ntor handshake to a new target last
899    /// hop.
900    pub async fn extend_ntor<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
901    where
902        Tg: CircTarget,
903    {
904        let key = NtorPublicKey {
905            id: *target
906                .rsa_identity()
907                .ok_or(Error::MissingId(RelayIdType::Rsa))?,
908            pk: *target.ntor_onion_key(),
909        };
910        let mut linkspecs = target
911            .linkspecs()
912            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
913        if !params.extend_by_ed25519_id {
914            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
915        }
916
917        let (tx, rx) = oneshot::channel();
918
919        let peer_id = OwnedChanTarget::from_chan_target(target);
920        let settings =
921            HopSettings::from_params_and_caps(&params, target.protovers())?.without_negotiation();
922        self.control
923            .unbounded_send(CtrlMsg::ExtendNtor {
924                peer_id,
925                public_key: key,
926                linkspecs,
927                settings,
928                done: tx,
929            })
930            .map_err(|_| Error::CircuitClosed)?;
931
932        rx.await.map_err(|_| Error::CircuitClosed)??;
933
934        Ok(())
935    }
936
937    /// Extend the circuit via the ntor handshake to a new target last
938    /// hop.
939    pub async fn extend_ntor_v3<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
940    where
941        Tg: CircTarget,
942    {
943        let key = NtorV3PublicKey {
944            id: *target
945                .ed_identity()
946                .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
947            pk: *target.ntor_onion_key(),
948        };
949        let mut linkspecs = target
950            .linkspecs()
951            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
952        if !params.extend_by_ed25519_id {
953            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
954        }
955
956        let (tx, rx) = oneshot::channel();
957
958        let peer_id = OwnedChanTarget::from_chan_target(target);
959        let settings = HopSettings::from_params_and_caps(&params, target.protovers())?;
960        self.control
961            .unbounded_send(CtrlMsg::ExtendNtorV3 {
962                peer_id,
963                public_key: key,
964                linkspecs,
965                settings,
966                done: tx,
967            })
968            .map_err(|_| Error::CircuitClosed)?;
969
970        rx.await.map_err(|_| Error::CircuitClosed)??;
971
972        Ok(())
973    }
974
975    /// Extend this circuit by a single, "virtual" hop.
976    ///
977    /// A virtual hop is one for which we do not add an actual network connection
978    /// between separate hosts (such as Relays).  We only add a layer of
979    /// cryptography.
980    ///
981    /// This is used to implement onion services: the client and the service
982    /// both build a circuit to a single rendezvous point, and tell the
983    /// rendezvous point to relay traffic between their two circuits.  Having
984    /// completed a [`handshake`] out of band[^1], the parties each extend their
985    /// circuits by a single "virtual" encryption hop that represents their
986    /// shared cryptographic context.
987    ///
988    /// Once a circuit has been extended in this way, it is an error to try to
989    /// extend it in any other way.
990    ///
991    /// [^1]: Technically, the handshake is only _mostly_ out of band: the
992    ///     client sends their half of the handshake in an ` message, and the
993    ///     service's response is inline in its `RENDEZVOUS2` message.
994    //
995    // TODO hs: let's try to enforce the "you can't extend a circuit again once
996    // it has been extended this way" property.  We could do that with internal
997    // state, or some kind of a type state pattern.
998    #[cfg(feature = "hs-common")]
999    pub async fn extend_virtual(
1000        &self,
1001        protocol: handshake::RelayProtocol,
1002        role: handshake::HandshakeRole,
1003        seed: impl handshake::KeyGenerator,
1004        params: &CircParameters,
1005        capabilities: &tor_protover::Protocols,
1006    ) -> Result<()> {
1007        use self::handshake::BoxedClientLayer;
1008
1009        let protocol = handshake::RelayCryptLayerProtocol::from(protocol);
1010        let relay_cell_format = protocol.relay_cell_format();
1011
1012        let BoxedClientLayer { fwd, back, binding } =
1013            protocol.construct_client_layers(role, seed)?;
1014
1015        let settings = HopSettings::from_params_and_caps(params, capabilities)?
1016            // TODO #2037: We _should_ support negotiation here; see #2037.
1017            .without_negotiation();
1018        let (tx, rx) = oneshot::channel();
1019        let message = CtrlCmd::ExtendVirtual {
1020            relay_cell_format,
1021            cell_crypto: (fwd, back, binding),
1022            settings,
1023            done: tx,
1024        };
1025
1026        self.command
1027            .unbounded_send(message)
1028            .map_err(|_| Error::CircuitClosed)?;
1029
1030        rx.await.map_err(|_| Error::CircuitClosed)?
1031    }
1032
1033    /// Helper, used to begin a stream.
1034    ///
1035    /// This function allocates a stream ID, and sends the message
1036    /// (like a BEGIN or RESOLVE), but doesn't wait for a response.
1037    ///
1038    /// The caller will typically want to see the first cell in response,
1039    /// to see whether it is e.g. an END or a CONNECTED.
1040    async fn begin_stream_impl(
1041        self: &Arc<ClientCirc>,
1042        begin_msg: AnyRelayMsg,
1043        cmd_checker: AnyCmdChecker,
1044    ) -> Result<(StreamReceiver, StreamTarget, StreamAccount)> {
1045        // TODO: Possibly this should take a hop, rather than just
1046        // assuming it's the last hop.
1047        let hop = TargetHop::LastHop;
1048
1049        let time_prov = self.time_provider.clone();
1050
1051        let memquota = StreamAccount::new(self.mq_account())?;
1052        let (sender, receiver) = MpscSpec::new(STREAM_READER_BUFFER)
1053            .new_mq(time_prov.clone(), memquota.as_raw_account())?;
1054        let (tx, rx) = oneshot::channel();
1055        let (msg_tx, msg_rx) =
1056            MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(time_prov, memquota.as_raw_account())?;
1057
1058        let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
1059
1060        self.control
1061            .unbounded_send(CtrlMsg::BeginStream {
1062                hop,
1063                message: begin_msg,
1064                sender,
1065                rx: msg_rx,
1066                rate_limit_notifier: rate_limit_tx,
1067                done: tx,
1068                cmd_checker,
1069            })
1070            .map_err(|_| Error::CircuitClosed)?;
1071
1072        let (stream_id, hop, relay_cell_format) = rx.await.map_err(|_| Error::CircuitClosed)??;
1073
1074        let target = StreamTarget {
1075            circ: self.clone(),
1076            tx: msg_tx,
1077            hop,
1078            stream_id,
1079            relay_cell_format,
1080            rate_limit_stream: rate_limit_rx,
1081        };
1082
1083        let stream_receiver = StreamReceiver {
1084            target: target.clone(),
1085            receiver,
1086            recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
1087            ended: false,
1088        };
1089
1090        Ok((stream_receiver, target, memquota))
1091    }
1092
1093    /// Start a DataStream (anonymized connection) to the given
1094    /// address and port, using a BEGIN cell.
1095    async fn begin_data_stream(
1096        self: &Arc<ClientCirc>,
1097        msg: AnyRelayMsg,
1098        optimistic: bool,
1099    ) -> Result<DataStream> {
1100        let (stream_receiver, target, memquota) = self
1101            .begin_stream_impl(msg, DataCmdChecker::new_any())
1102            .await?;
1103        let mut stream = DataStream::new(
1104            self.time_provider.clone(),
1105            stream_receiver,
1106            target,
1107            memquota,
1108        );
1109        if !optimistic {
1110            stream.wait_for_connection().await?;
1111        }
1112        Ok(stream)
1113    }
1114
1115    /// Start a stream to the given address and port, using a BEGIN
1116    /// cell.
1117    ///
1118    /// The use of a string for the address is intentional: you should let
1119    /// the remote Tor relay do the hostname lookup for you.
1120    pub async fn begin_stream(
1121        self: &Arc<ClientCirc>,
1122        target: &str,
1123        port: u16,
1124        parameters: Option<StreamParameters>,
1125    ) -> Result<DataStream> {
1126        let parameters = parameters.unwrap_or_default();
1127        let begin_flags = parameters.begin_flags();
1128        let optimistic = parameters.is_optimistic();
1129        let target = if parameters.suppressing_hostname() {
1130            ""
1131        } else {
1132            target
1133        };
1134        let beginmsg = Begin::new(target, port, begin_flags)
1135            .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
1136        self.begin_data_stream(beginmsg.into(), optimistic).await
1137    }
1138
1139    /// Start a new stream to the last relay in the circuit, using
1140    /// a BEGIN_DIR cell.
1141    pub async fn begin_dir_stream(self: Arc<ClientCirc>) -> Result<DataStream> {
1142        // Note that we always open begindir connections optimistically.
1143        // Since they are local to a relay that we've already authenticated
1144        // with and built a circuit to, there should be no additional checks
1145        // we need to perform to see whether the BEGINDIR will succeed.
1146        self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
1147            .await
1148    }
1149
1150    /// Perform a DNS lookup, using a RESOLVE cell with the last relay
1151    /// in this circuit.
1152    ///
1153    /// Note that this function does not check for timeouts; that's
1154    /// the caller's responsibility.
1155    pub async fn resolve(self: &Arc<ClientCirc>, hostname: &str) -> Result<Vec<IpAddr>> {
1156        let resolve_msg = Resolve::new(hostname);
1157
1158        let resolved_msg = self.try_resolve(resolve_msg).await?;
1159
1160        resolved_msg
1161            .into_answers()
1162            .into_iter()
1163            .filter_map(|(val, _)| match resolvedval_to_result(val) {
1164                Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
1165                Ok(_) => None,
1166                Err(e) => Some(Err(e)),
1167            })
1168            .collect()
1169    }
1170
1171    /// Perform a reverse DNS lookup, by sending a RESOLVE cell with
1172    /// the last relay on this circuit.
1173    ///
1174    /// Note that this function does not check for timeouts; that's
1175    /// the caller's responsibility.
1176    pub async fn resolve_ptr(self: &Arc<ClientCirc>, addr: IpAddr) -> Result<Vec<String>> {
1177        let resolve_ptr_msg = Resolve::new_reverse(&addr);
1178
1179        let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
1180
1181        resolved_msg
1182            .into_answers()
1183            .into_iter()
1184            .filter_map(|(val, _)| match resolvedval_to_result(val) {
1185                Ok(ResolvedVal::Hostname(v)) => Some(
1186                    String::from_utf8(v)
1187                        .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
1188                ),
1189                Ok(_) => None,
1190                Err(e) => Some(Err(e)),
1191            })
1192            .collect()
1193    }
1194
1195    /// Helper: Send the resolve message, and read resolved message from
1196    /// resolve stream.
1197    async fn try_resolve(self: &Arc<ClientCirc>, msg: Resolve) -> Result<Resolved> {
1198        let (stream_receiver, _target, memquota) = self
1199            .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
1200            .await?;
1201        let mut resolve_stream = ResolveStream::new(stream_receiver, memquota);
1202        resolve_stream.read_msg().await
1203    }
1204
1205    /// Shut down this circuit, along with all streams that are using it.
1206    /// Happens asynchronously (i.e. the circuit won't necessarily be done shutting down
1207    /// immediately after this function returns!).
1208    ///
1209    /// Note that other references to this circuit may exist.  If they
1210    /// do, they will stop working after you call this function.
1211    ///
1212    /// It's not necessary to call this method if you're just done
1213    /// with a circuit: the circuit should close on its own once nothing
1214    /// is using it any more.
1215    pub fn terminate(&self) {
1216        let _ = self.command.unbounded_send(CtrlCmd::Shutdown);
1217    }
1218
1219    /// Called when a circuit-level protocol error has occurred and the
1220    /// circuit needs to shut down.
1221    ///
1222    /// This is a separate function because we may eventually want to have
1223    /// it do more than just shut down.
1224    ///
1225    /// As with `terminate`, this function is asynchronous.
1226    pub(crate) fn protocol_error(&self) {
1227        self.terminate();
1228    }
1229
1230    /// Return true if this circuit is closed and therefore unusable.
1231    pub fn is_closing(&self) -> bool {
1232        self.control.is_closed()
1233    }
1234
1235    /// Return a process-unique identifier for this circuit.
1236    pub fn unique_id(&self) -> UniqId {
1237        self.unique_id
1238    }
1239
1240    /// Return the number of hops in this circuit.
1241    ///
1242    /// NOTE: This function will currently return only the number of hops
1243    /// _currently_ in the circuit. If there is an extend operation in progress,
1244    /// the currently pending hop may or may not be counted, depending on whether
1245    /// the extend operation finishes before this call is done.
1246    pub fn n_hops(&self) -> Result<usize> {
1247        self.mutable
1248            .n_hops(self.unique_id)
1249            .map_err(|_| Error::CircuitClosed)
1250    }
1251
1252    /// Return a future that will resolve once this circuit has closed.
1253    ///
1254    /// Note that this method does not itself cause the circuit to shut down.
1255    ///
1256    /// TODO: Perhaps this should return some kind of status indication instead
1257    /// of just ()
1258    #[cfg(feature = "experimental-api")]
1259    pub fn wait_for_close(&self) -> impl futures::Future<Output = ()> + Send + Sync + 'static {
1260        self.reactor_closed_rx.clone().map(|_| ())
1261    }
1262}
1263
1264/// Handle to use during an ongoing protocol exchange with a circuit's last hop
1265///
1266/// This is obtained from [`ClientCirc::start_conversation`],
1267/// and used to send messages to the last hop relay.
1268//
1269// TODO(conflux): this should use ClientTunnel, and it should be moved into
1270// the tunnel module.
1271#[cfg(feature = "send-control-msg")]
1272#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
1273pub struct Conversation<'r>(&'r ClientCirc);
1274
1275#[cfg(feature = "send-control-msg")]
1276#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
1277impl Conversation<'_> {
1278    /// Send a protocol message as part of an ad-hoc exchange
1279    ///
1280    /// Responses are handled by the `MsgHandler` set up
1281    /// when the `Conversation` was created.
1282    pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
1283        self.send_internal(Some(msg), None).await
1284    }
1285
1286    /// Send a `SendMsgAndInstallHandler` to the reactor and wait for the outcome
1287    ///
1288    /// The guts of `start_conversation` and `Conversation::send_msg`
1289    pub(crate) async fn send_internal(
1290        &self,
1291        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
1292        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
1293    ) -> Result<()> {
1294        let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
1295        let (sender, receiver) = oneshot::channel();
1296
1297        let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
1298            msg,
1299            handler,
1300            sender,
1301        };
1302        self.0
1303            .control
1304            .unbounded_send(ctrl_msg)
1305            .map_err(|_| Error::CircuitClosed)?;
1306
1307        receiver.await.map_err(|_| Error::CircuitClosed)?
1308    }
1309}
1310
1311impl PendingClientCirc {
1312    /// Instantiate a new circuit object: used from Channel::new_circ().
1313    ///
1314    /// Does not send a CREATE* cell on its own.
1315    ///
1316    ///
1317    pub(crate) fn new(
1318        id: CircId,
1319        channel: Arc<Channel>,
1320        createdreceiver: oneshot::Receiver<CreateResponse>,
1321        input: CircuitRxReceiver,
1322        unique_id: UniqId,
1323        runtime: DynTimeProvider,
1324        memquota: CircuitAccount,
1325    ) -> (PendingClientCirc, crate::tunnel::reactor::Reactor) {
1326        let time_provider = channel.time_provider().clone();
1327        let (reactor, control_tx, command_tx, reactor_closed_rx, mutable) =
1328            Reactor::new(channel, id, unique_id, input, runtime, memquota.clone());
1329
1330        let circuit = ClientCirc {
1331            mutable,
1332            unique_id,
1333            control: control_tx,
1334            command: command_tx,
1335            reactor_closed_rx: reactor_closed_rx.shared(),
1336            #[cfg(test)]
1337            circid: id,
1338            memquota,
1339            time_provider,
1340        };
1341
1342        let pending = PendingClientCirc {
1343            recvcreated: createdreceiver,
1344            circ: Arc::new(circuit),
1345        };
1346        (pending, reactor)
1347    }
1348
1349    /// Extract the process-unique identifier for this pending circuit.
1350    pub fn peek_unique_id(&self) -> UniqId {
1351        self.circ.unique_id
1352    }
1353
1354    /// Use the (questionable!) CREATE_FAST handshake to connect to the
1355    /// first hop of this circuit.
1356    ///
1357    /// There's no authentication in CRATE_FAST,
1358    /// so we don't need to know whom we're connecting to: we're just
1359    /// connecting to whichever relay the channel is for.
1360    pub async fn create_firsthop_fast(self, params: CircParameters) -> Result<Arc<ClientCirc>> {
1361        // We no nothing about this relay, so we assume it supports no protocol capabilities at all.
1362        //
1363        // TODO: If we had a consensus, we could assume it supported all required-relay-protocols.
1364        let protocols = tor_protover::Protocols::new();
1365        let settings =
1366            HopSettings::from_params_and_caps(&params, &protocols)?.without_negotiation();
1367
1368        let (tx, rx) = oneshot::channel();
1369        self.circ
1370            .control
1371            .unbounded_send(CtrlMsg::Create {
1372                recv_created: self.recvcreated,
1373                handshake: CircuitHandshake::CreateFast,
1374                settings,
1375                done: tx,
1376            })
1377            .map_err(|_| Error::CircuitClosed)?;
1378
1379        rx.await.map_err(|_| Error::CircuitClosed)??;
1380
1381        Ok(self.circ)
1382    }
1383
1384    /// Use the most appropriate handshake to connect to the first hop of this circuit.
1385    ///
1386    /// Note that the provided 'target' must match the channel's target,
1387    /// or the handshake will fail.
1388    pub async fn create_firsthop<Tg>(
1389        self,
1390        target: &Tg,
1391        params: CircParameters,
1392    ) -> Result<Arc<ClientCirc>>
1393    where
1394        Tg: tor_linkspec::CircTarget,
1395    {
1396        // (See note in ClientCirc::extend.)
1397        if target
1398            .protovers()
1399            .supports_named_subver(named::RELAY_NTORV3)
1400        {
1401            self.create_firsthop_ntor_v3(target, params).await
1402        } else {
1403            self.create_firsthop_ntor(target, params).await
1404        }
1405    }
1406
1407    /// Use the ntor handshake to connect to the first hop of this circuit.
1408    ///
1409    /// Note that the provided 'target' must match the channel's target,
1410    /// or the handshake will fail.
1411    pub async fn create_firsthop_ntor<Tg>(
1412        self,
1413        target: &Tg,
1414        params: CircParameters,
1415    ) -> Result<Arc<ClientCirc>>
1416    where
1417        Tg: tor_linkspec::CircTarget,
1418    {
1419        let (tx, rx) = oneshot::channel();
1420        let settings =
1421            HopSettings::from_params_and_caps(&params, target.protovers())?.without_negotiation();
1422
1423        self.circ
1424            .control
1425            .unbounded_send(CtrlMsg::Create {
1426                recv_created: self.recvcreated,
1427                handshake: CircuitHandshake::Ntor {
1428                    public_key: NtorPublicKey {
1429                        id: *target
1430                            .rsa_identity()
1431                            .ok_or(Error::MissingId(RelayIdType::Rsa))?,
1432                        pk: *target.ntor_onion_key(),
1433                    },
1434                    ed_identity: *target
1435                        .ed_identity()
1436                        .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
1437                },
1438                settings,
1439                done: tx,
1440            })
1441            .map_err(|_| Error::CircuitClosed)?;
1442
1443        rx.await.map_err(|_| Error::CircuitClosed)??;
1444
1445        Ok(self.circ)
1446    }
1447
1448    /// Use the ntor_v3 handshake to connect to the first hop of this circuit.
1449    ///
1450    /// Assumes that the target supports ntor_v3. The caller should verify
1451    /// this before calling this function, e.g. by validating that the target
1452    /// has advertised ["Relay=4"](https://spec.torproject.org/tor-spec/subprotocol-versioning.html#relay).
1453    ///
1454    /// Note that the provided 'target' must match the channel's target,
1455    /// or the handshake will fail.
1456    pub async fn create_firsthop_ntor_v3<Tg>(
1457        self,
1458        target: &Tg,
1459        params: CircParameters,
1460    ) -> Result<Arc<ClientCirc>>
1461    where
1462        Tg: tor_linkspec::CircTarget,
1463    {
1464        let settings = HopSettings::from_params_and_caps(&params, target.protovers())?;
1465        let (tx, rx) = oneshot::channel();
1466
1467        self.circ
1468            .control
1469            .unbounded_send(CtrlMsg::Create {
1470                recv_created: self.recvcreated,
1471                handshake: CircuitHandshake::NtorV3 {
1472                    public_key: NtorV3PublicKey {
1473                        id: *target
1474                            .ed_identity()
1475                            .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
1476                        pk: *target.ntor_onion_key(),
1477                    },
1478                },
1479                settings,
1480                done: tx,
1481            })
1482            .map_err(|_| Error::CircuitClosed)?;
1483
1484        rx.await.map_err(|_| Error::CircuitClosed)??;
1485
1486        Ok(self.circ)
1487    }
1488}
1489
1490/// Convert a [`ResolvedVal`] into a Result, based on whether or not
1491/// it represents an error.
1492fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
1493    match val {
1494        ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
1495        ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
1496        ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
1497        _ => Ok(val),
1498    }
1499}
1500
1501#[cfg(test)]
1502pub(crate) mod test {
1503    // @@ begin test lint list maintained by maint/add_warning @@
1504    #![allow(clippy::bool_assert_comparison)]
1505    #![allow(clippy::clone_on_copy)]
1506    #![allow(clippy::dbg_macro)]
1507    #![allow(clippy::mixed_attributes_style)]
1508    #![allow(clippy::print_stderr)]
1509    #![allow(clippy::print_stdout)]
1510    #![allow(clippy::single_char_pattern)]
1511    #![allow(clippy::unwrap_used)]
1512    #![allow(clippy::unchecked_duration_subtraction)]
1513    #![allow(clippy::useless_vec)]
1514    #![allow(clippy::needless_pass_by_value)]
1515    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1516
1517    use super::*;
1518    use crate::channel::OpenChanCellS2C;
1519    use crate::channel::{test::new_reactor, CodecError};
1520    use crate::congestion::test_utils::params::build_cc_vegas_params;
1521    use crate::crypto::cell::RelayCellBody;
1522    use crate::crypto::handshake::ntor_v3::NtorV3Server;
1523    #[cfg(feature = "hs-service")]
1524    use crate::stream::IncomingStreamRequestFilter;
1525    use chanmsg::{AnyChanMsg, Created2, CreatedFast};
1526    use futures::channel::mpsc::{Receiver, Sender};
1527    use futures::io::{AsyncReadExt, AsyncWriteExt};
1528    use futures::sink::SinkExt;
1529    use futures::stream::StreamExt;
1530    use futures::task::SpawnExt;
1531    use hex_literal::hex;
1532    use std::collections::{HashMap, VecDeque};
1533    use std::fmt::Debug;
1534    use std::time::Duration;
1535    use tor_basic_utils::test_rng::testing_rng;
1536    use tor_cell::chancell::{msg as chanmsg, AnyChanCell, BoxedCellBody, ChanCell, ChanCmd};
1537    use tor_cell::relaycell::extend::{self as extend_ext, CircRequestExt, CircResponseExt};
1538    use tor_cell::relaycell::msg::SendmeTag;
1539    use tor_cell::relaycell::{
1540        msg as relaymsg, AnyRelayMsgOuter, RelayCellFormat, RelayCmd, RelayMsg as _, StreamId,
1541    };
1542    use tor_linkspec::OwnedCircTarget;
1543    use tor_memquota::HasMemoryCost;
1544    use tor_rtcompat::Runtime;
1545    use tracing::trace;
1546    use tracing_test::traced_test;
1547
1548    #[cfg(feature = "conflux")]
1549    use {
1550        crate::tunnel::reactor::ConfluxHandshakeResult,
1551        crate::util::err::ConfluxHandshakeError,
1552        futures::lock::Mutex as AsyncMutex,
1553        std::result::Result as StdResult,
1554        tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
1555        tor_cell::relaycell::msg::ConfluxLink,
1556        tor_rtmock::MockRuntime,
1557    };
1558
1559    impl PendingClientCirc {
1560        /// Testing only: Extract the circuit ID for this pending circuit.
1561        pub(crate) fn peek_circid(&self) -> CircId {
1562            self.circ.circid
1563        }
1564    }
1565
1566    impl ClientCirc {
1567        /// Testing only: Extract the circuit ID of this circuit.
1568        pub(crate) fn peek_circid(&self) -> CircId {
1569            self.circid
1570        }
1571    }
1572
1573    fn rmsg_to_ccmsg(id: Option<StreamId>, msg: relaymsg::AnyRelayMsg) -> ClientCircChanMsg {
1574        // TODO #1947: test other formats.
1575        let rfmt = RelayCellFormat::V0;
1576        let body: BoxedCellBody = AnyRelayMsgOuter::new(id, msg)
1577            .encode(rfmt, &mut testing_rng())
1578            .unwrap();
1579        let chanmsg = chanmsg::Relay::from(body);
1580        ClientCircChanMsg::Relay(chanmsg)
1581    }
1582
1583    // Example relay IDs and keys
1584    const EXAMPLE_SK: [u8; 32] =
1585        hex!("7789d92a89711a7e2874c61ea495452cfd48627b3ca2ea9546aafa5bf7b55803");
1586    const EXAMPLE_PK: [u8; 32] =
1587        hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253");
1588    const EXAMPLE_ED_ID: [u8; 32] = [6; 32];
1589    const EXAMPLE_RSA_ID: [u8; 20] = [10; 20];
1590
1591    /// Make an MPSC queue, of the type we use in Channels, but a fake one for testing
1592    #[cfg(test)]
1593    pub(crate) fn fake_mpsc<T: HasMemoryCost + Debug + Send>(
1594        buffer: usize,
1595    ) -> (StreamMpscSender<T>, StreamMpscReceiver<T>) {
1596        crate::fake_mpsc(buffer)
1597    }
1598
1599    /// return an example OwnedCircTarget that can get used for an ntor handshake.
1600    fn example_target() -> OwnedCircTarget {
1601        let mut builder = OwnedCircTarget::builder();
1602        builder
1603            .chan_target()
1604            .ed_identity(EXAMPLE_ED_ID.into())
1605            .rsa_identity(EXAMPLE_RSA_ID.into());
1606        builder
1607            .ntor_onion_key(EXAMPLE_PK.into())
1608            .protocols("FlowCtrl=1-2".parse().unwrap())
1609            .build()
1610            .unwrap()
1611    }
1612    fn example_ntor_key() -> crate::crypto::handshake::ntor::NtorSecretKey {
1613        crate::crypto::handshake::ntor::NtorSecretKey::new(
1614            EXAMPLE_SK.into(),
1615            EXAMPLE_PK.into(),
1616            EXAMPLE_RSA_ID.into(),
1617        )
1618    }
1619    fn example_ntor_v3_key() -> crate::crypto::handshake::ntor_v3::NtorV3SecretKey {
1620        crate::crypto::handshake::ntor_v3::NtorV3SecretKey::new(
1621            EXAMPLE_SK.into(),
1622            EXAMPLE_PK.into(),
1623            EXAMPLE_ED_ID.into(),
1624        )
1625    }
1626
1627    fn working_fake_channel<R: Runtime>(
1628        rt: &R,
1629    ) -> (
1630        Arc<Channel>,
1631        Receiver<AnyChanCell>,
1632        Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
1633    ) {
1634        let (channel, chan_reactor, rx, tx) = new_reactor(rt.clone());
1635        rt.spawn(async {
1636            let _ignore = chan_reactor.run().await;
1637        })
1638        .unwrap();
1639        (channel, rx, tx)
1640    }
1641
1642    /// Which handshake type to use.
1643    #[derive(Copy, Clone)]
1644    enum HandshakeType {
1645        Fast,
1646        Ntor,
1647        NtorV3,
1648    }
1649
1650    async fn test_create<R: Runtime>(rt: &R, handshake_type: HandshakeType, with_cc: bool) {
1651        // We want to try progressing from a pending circuit to a circuit
1652        // via a crate_fast handshake.
1653
1654        use crate::crypto::handshake::{fast::CreateFastServer, ntor::NtorServer, ServerHandshake};
1655
1656        let (chan, mut rx, _sink) = working_fake_channel(rt);
1657        let circid = CircId::new(128).unwrap();
1658        let (created_send, created_recv) = oneshot::channel();
1659        let (_circmsg_send, circmsg_recv) = fake_mpsc(64);
1660        let unique_id = UniqId::new(23, 17);
1661
1662        let (pending, reactor) = PendingClientCirc::new(
1663            circid,
1664            chan,
1665            created_recv,
1666            circmsg_recv,
1667            unique_id,
1668            DynTimeProvider::new(rt.clone()),
1669            CircuitAccount::new_noop(),
1670        );
1671
1672        rt.spawn(async {
1673            let _ignore = reactor.run().await;
1674        })
1675        .unwrap();
1676
1677        // Future to pretend to be a relay on the other end of the circuit.
1678        let simulate_relay_fut = async move {
1679            let mut rng = testing_rng();
1680            let create_cell = rx.next().await.unwrap();
1681            assert_eq!(create_cell.circid(), Some(circid));
1682            let reply = match handshake_type {
1683                HandshakeType::Fast => {
1684                    let cf = match create_cell.msg() {
1685                        AnyChanMsg::CreateFast(cf) => cf,
1686                        other => panic!("{:?}", other),
1687                    };
1688                    let (_, rep) = CreateFastServer::server(
1689                        &mut rng,
1690                        &mut |_: &()| Some(()),
1691                        &[()],
1692                        cf.handshake(),
1693                    )
1694                    .unwrap();
1695                    CreateResponse::CreatedFast(CreatedFast::new(rep))
1696                }
1697                HandshakeType::Ntor => {
1698                    let c2 = match create_cell.msg() {
1699                        AnyChanMsg::Create2(c2) => c2,
1700                        other => panic!("{:?}", other),
1701                    };
1702                    let (_, rep) = NtorServer::server(
1703                        &mut rng,
1704                        &mut |_: &()| Some(()),
1705                        &[example_ntor_key()],
1706                        c2.body(),
1707                    )
1708                    .unwrap();
1709                    CreateResponse::Created2(Created2::new(rep))
1710                }
1711                HandshakeType::NtorV3 => {
1712                    let c2 = match create_cell.msg() {
1713                        AnyChanMsg::Create2(c2) => c2,
1714                        other => panic!("{:?}", other),
1715                    };
1716                    let mut reply_fn = if with_cc {
1717                        |client_exts: &[CircRequestExt]| {
1718                            let _ = client_exts
1719                                .iter()
1720                                .find(|e| matches!(e, CircRequestExt::CcRequest(_)))
1721                                .expect("Client failed to request CC");
1722                            // This needs to be aligned to test_utils params
1723                            // value due to validation that needs it in range.
1724                            Some(vec![CircResponseExt::CcResponse(
1725                                extend_ext::CcResponse::new(31),
1726                            )])
1727                        }
1728                    } else {
1729                        |_: &_| Some(vec![])
1730                    };
1731                    let (_, rep) = NtorV3Server::server(
1732                        &mut rng,
1733                        &mut reply_fn,
1734                        &[example_ntor_v3_key()],
1735                        c2.body(),
1736                    )
1737                    .unwrap();
1738                    CreateResponse::Created2(Created2::new(rep))
1739                }
1740            };
1741            created_send.send(reply).unwrap();
1742        };
1743        // Future to pretend to be a client.
1744        let client_fut = async move {
1745            let target = example_target();
1746            let params = CircParameters::default();
1747            let ret = match handshake_type {
1748                HandshakeType::Fast => {
1749                    trace!("doing fast create");
1750                    pending.create_firsthop_fast(params).await
1751                }
1752                HandshakeType::Ntor => {
1753                    trace!("doing ntor create");
1754                    pending.create_firsthop_ntor(&target, params).await
1755                }
1756                HandshakeType::NtorV3 => {
1757                    let params = if with_cc {
1758                        // Setup CC vegas parameters.
1759                        CircParameters::new(true, build_cc_vegas_params())
1760                    } else {
1761                        params
1762                    };
1763                    trace!("doing ntor_v3 create");
1764                    pending.create_firsthop_ntor_v3(&target, params).await
1765                }
1766            };
1767            trace!("create done: result {:?}", ret);
1768            ret
1769        };
1770
1771        let (circ, _) = futures::join!(client_fut, simulate_relay_fut);
1772
1773        let _circ = circ.unwrap();
1774
1775        // pfew!  We've build a circuit!  Let's make sure it has one hop.
1776        assert_eq!(_circ.n_hops().unwrap(), 1);
1777    }
1778
1779    #[traced_test]
1780    #[test]
1781    fn test_create_fast() {
1782        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1783            test_create(&rt, HandshakeType::Fast, false).await;
1784        });
1785    }
1786    #[traced_test]
1787    #[test]
1788    fn test_create_ntor() {
1789        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1790            test_create(&rt, HandshakeType::Ntor, false).await;
1791        });
1792    }
1793    #[traced_test]
1794    #[test]
1795    fn test_create_ntor_v3() {
1796        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1797            test_create(&rt, HandshakeType::NtorV3, false).await;
1798        });
1799    }
1800    #[traced_test]
1801    #[test]
1802    #[cfg(feature = "flowctl-cc")]
1803    fn test_create_ntor_v3_with_cc() {
1804        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1805            test_create(&rt, HandshakeType::NtorV3, true).await;
1806        });
1807    }
1808
1809    // An encryption layer that doesn't do any crypto.   Can be used
1810    // as inbound or outbound, but not both at once.
1811    pub(crate) struct DummyCrypto {
1812        counter_tag: [u8; 20],
1813        counter: u32,
1814        lasthop: bool,
1815    }
1816    impl DummyCrypto {
1817        fn next_tag(&mut self) -> SendmeTag {
1818            #![allow(clippy::identity_op)]
1819            self.counter_tag[0] = ((self.counter >> 0) & 255) as u8;
1820            self.counter_tag[1] = ((self.counter >> 8) & 255) as u8;
1821            self.counter_tag[2] = ((self.counter >> 16) & 255) as u8;
1822            self.counter_tag[3] = ((self.counter >> 24) & 255) as u8;
1823            self.counter += 1;
1824            self.counter_tag.into()
1825        }
1826    }
1827
1828    impl crate::crypto::cell::OutboundClientLayer for DummyCrypto {
1829        fn originate_for(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
1830            self.next_tag()
1831        }
1832        fn encrypt_outbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
1833    }
1834    impl crate::crypto::cell::InboundClientLayer for DummyCrypto {
1835        fn decrypt_inbound(
1836            &mut self,
1837            _cmd: ChanCmd,
1838            _cell: &mut RelayCellBody,
1839        ) -> Option<SendmeTag> {
1840            if self.lasthop {
1841                Some(self.next_tag())
1842            } else {
1843                None
1844            }
1845        }
1846    }
1847    impl DummyCrypto {
1848        pub(crate) fn new(lasthop: bool) -> Self {
1849            DummyCrypto {
1850                counter_tag: [0; 20],
1851                counter: 0,
1852                lasthop,
1853            }
1854        }
1855    }
1856
1857    // Helper: set up a 3-hop circuit with no encryption, where the
1858    // next inbound message seems to come from hop next_msg_from
1859    async fn newcirc_ext<R: Runtime>(
1860        rt: &R,
1861        unique_id: UniqId,
1862        chan: Arc<Channel>,
1863        hops: Vec<path::HopDetail>,
1864        next_msg_from: HopNum,
1865        params: CircParameters,
1866    ) -> (Arc<ClientCirc>, CircuitRxSender) {
1867        let circid = CircId::new(128).unwrap();
1868        let (_created_send, created_recv) = oneshot::channel();
1869        let (circmsg_send, circmsg_recv) = fake_mpsc(64);
1870
1871        let (pending, reactor) = PendingClientCirc::new(
1872            circid,
1873            chan,
1874            created_recv,
1875            circmsg_recv,
1876            unique_id,
1877            DynTimeProvider::new(rt.clone()),
1878            CircuitAccount::new_noop(),
1879        );
1880
1881        rt.spawn(async {
1882            let _ignore = reactor.run().await;
1883        })
1884        .unwrap();
1885
1886        let PendingClientCirc {
1887            circ,
1888            recvcreated: _,
1889        } = pending;
1890
1891        // TODO #1067: Support other formats
1892        let relay_cell_format = RelayCellFormat::V0;
1893
1894        let last_hop_num = u8::try_from(hops.len() - 1).unwrap();
1895        for (idx, peer_id) in hops.into_iter().enumerate() {
1896            let (tx, rx) = oneshot::channel();
1897            let idx = idx as u8;
1898
1899            circ.command
1900                .unbounded_send(CtrlCmd::AddFakeHop {
1901                    relay_cell_format,
1902                    fwd_lasthop: idx == last_hop_num,
1903                    rev_lasthop: idx == u8::from(next_msg_from),
1904                    peer_id,
1905                    params: params.clone(),
1906                    done: tx,
1907                })
1908                .unwrap();
1909            rx.await.unwrap().unwrap();
1910        }
1911
1912        (circ, circmsg_send)
1913    }
1914
1915    // Helper: set up a 3-hop circuit with no encryption, where the
1916    // next inbound message seems to come from hop next_msg_from
1917    async fn newcirc<R: Runtime>(rt: &R, chan: Arc<Channel>) -> (Arc<ClientCirc>, CircuitRxSender) {
1918        let hops = std::iter::repeat_with(|| {
1919            let peer_id = tor_linkspec::OwnedChanTarget::builder()
1920                .ed_identity([4; 32].into())
1921                .rsa_identity([5; 20].into())
1922                .build()
1923                .expect("Could not construct fake hop");
1924
1925            path::HopDetail::Relay(peer_id)
1926        })
1927        .take(3)
1928        .collect();
1929
1930        let unique_id = UniqId::new(23, 17);
1931        newcirc_ext(
1932            rt,
1933            unique_id,
1934            chan,
1935            hops,
1936            2.into(),
1937            CircParameters::default(),
1938        )
1939        .await
1940    }
1941
1942    /// Create `n` distinct [`path::HopDetail`]s,
1943    /// with the specified `start_idx` for the dummy identities.
1944    fn hop_details(n: u8, start_idx: u8) -> Vec<path::HopDetail> {
1945        (0..n)
1946            .map(|idx| {
1947                let peer_id = tor_linkspec::OwnedChanTarget::builder()
1948                    .ed_identity([idx + start_idx; 32].into())
1949                    .rsa_identity([idx + start_idx + 1; 20].into())
1950                    .build()
1951                    .expect("Could not construct fake hop");
1952
1953                path::HopDetail::Relay(peer_id)
1954            })
1955            .collect()
1956    }
1957
1958    async fn test_extend<R: Runtime>(rt: &R, handshake_type: HandshakeType) {
1959        use crate::crypto::handshake::{ntor::NtorServer, ServerHandshake};
1960
1961        let (chan, mut rx, _sink) = working_fake_channel(rt);
1962        let (circ, mut sink) = newcirc(rt, chan).await;
1963        let circid = circ.peek_circid();
1964        let params = CircParameters::default();
1965
1966        let extend_fut = async move {
1967            let target = example_target();
1968            match handshake_type {
1969                HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1970                HandshakeType::Ntor => circ.extend_ntor(&target, params).await.unwrap(),
1971                HandshakeType::NtorV3 => circ.extend_ntor_v3(&target, params).await.unwrap(),
1972            };
1973            circ // gotta keep the circ alive, or the reactor would exit.
1974        };
1975        let reply_fut = async move {
1976            // We've disabled encryption on this circuit, so we can just
1977            // read the extend2 cell.
1978            let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1979            assert_eq!(id, Some(circid));
1980            let rmsg = match chmsg {
1981                AnyChanMsg::RelayEarly(r) => {
1982                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1983                        .unwrap()
1984                }
1985                other => panic!("{:?}", other),
1986            };
1987            let e2 = match rmsg.msg() {
1988                AnyRelayMsg::Extend2(e2) => e2,
1989                other => panic!("{:?}", other),
1990            };
1991            let mut rng = testing_rng();
1992            let reply = match handshake_type {
1993                HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1994                HandshakeType::Ntor => {
1995                    let (_keygen, reply) = NtorServer::server(
1996                        &mut rng,
1997                        &mut |_: &()| Some(()),
1998                        &[example_ntor_key()],
1999                        e2.handshake(),
2000                    )
2001                    .unwrap();
2002                    reply
2003                }
2004                HandshakeType::NtorV3 => {
2005                    let (_keygen, reply) = NtorV3Server::server(
2006                        &mut rng,
2007                        &mut |_: &[CircRequestExt]| Some(vec![]),
2008                        &[example_ntor_v3_key()],
2009                        e2.handshake(),
2010                    )
2011                    .unwrap();
2012                    reply
2013                }
2014            };
2015
2016            let extended2 = relaymsg::Extended2::new(reply).into();
2017            sink.send(rmsg_to_ccmsg(None, extended2)).await.unwrap();
2018            (sink, rx) // gotta keep the sink and receiver alive, or the reactor will exit.
2019        };
2020
2021        let (circ, (_sink, _rx)) = futures::join!(extend_fut, reply_fut);
2022
2023        // Did we really add another hop?
2024        assert_eq!(circ.n_hops().unwrap(), 4);
2025
2026        // Do the path accessors report a reasonable outcome?
2027        {
2028            let path = circ.path_ref().unwrap();
2029            let path = path
2030                .all_hops()
2031                .filter_map(|hop| match hop {
2032                    path::HopDetail::Relay(r) => Some(r),
2033                    #[cfg(feature = "hs-common")]
2034                    path::HopDetail::Virtual => None,
2035                })
2036                .collect::<Vec<_>>();
2037
2038            assert_eq!(path.len(), 4);
2039            use tor_linkspec::HasRelayIds;
2040            assert_eq!(path[3].ed_identity(), example_target().ed_identity());
2041            assert_ne!(path[0].ed_identity(), example_target().ed_identity());
2042        }
2043        {
2044            let path = circ.path_ref().unwrap();
2045            assert_eq!(path.n_hops(), 4);
2046            use tor_linkspec::HasRelayIds;
2047            assert_eq!(
2048                path.hops()[3].as_chan_target().unwrap().ed_identity(),
2049                example_target().ed_identity()
2050            );
2051            assert_ne!(
2052                path.hops()[0].as_chan_target().unwrap().ed_identity(),
2053                example_target().ed_identity()
2054            );
2055        }
2056    }
2057
2058    #[traced_test]
2059    #[test]
2060    fn test_extend_ntor() {
2061        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2062            test_extend(&rt, HandshakeType::Ntor).await;
2063        });
2064    }
2065
2066    #[traced_test]
2067    #[test]
2068    fn test_extend_ntor_v3() {
2069        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2070            test_extend(&rt, HandshakeType::NtorV3).await;
2071        });
2072    }
2073
2074    async fn bad_extend_test_impl<R: Runtime>(
2075        rt: &R,
2076        reply_hop: HopNum,
2077        bad_reply: ClientCircChanMsg,
2078    ) -> Error {
2079        let (chan, _rx, _sink) = working_fake_channel(rt);
2080        let hops = std::iter::repeat_with(|| {
2081            let peer_id = tor_linkspec::OwnedChanTarget::builder()
2082                .ed_identity([4; 32].into())
2083                .rsa_identity([5; 20].into())
2084                .build()
2085                .expect("Could not construct fake hop");
2086
2087            path::HopDetail::Relay(peer_id)
2088        })
2089        .take(3)
2090        .collect();
2091
2092        let unique_id = UniqId::new(23, 17);
2093        let (circ, mut sink) = newcirc_ext(
2094            rt,
2095            unique_id,
2096            chan,
2097            hops,
2098            reply_hop,
2099            CircParameters::default(),
2100        )
2101        .await;
2102        let params = CircParameters::default();
2103
2104        let target = example_target();
2105        #[allow(clippy::clone_on_copy)]
2106        let rtc = rt.clone();
2107        let sink_handle = rt
2108            .spawn_with_handle(async move {
2109                rtc.sleep(Duration::from_millis(100)).await;
2110                sink.send(bad_reply).await.unwrap();
2111                sink
2112            })
2113            .unwrap();
2114        let outcome = circ.extend_ntor(&target, params).await;
2115        let _sink = sink_handle.await;
2116
2117        assert_eq!(circ.n_hops().unwrap(), 3);
2118        assert!(outcome.is_err());
2119        outcome.unwrap_err()
2120    }
2121
2122    #[traced_test]
2123    #[test]
2124    fn bad_extend_wronghop() {
2125        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2126            let extended2 = relaymsg::Extended2::new(vec![]).into();
2127            let cc = rmsg_to_ccmsg(None, extended2);
2128
2129            let error = bad_extend_test_impl(&rt, 1.into(), cc).await;
2130            // This case shows up as a CircDestroy, since a message sent
2131            // from the wrong hop won't even be delivered to the extend
2132            // code's meta-handler.  Instead the unexpected message will cause
2133            // the circuit to get torn down.
2134            match error {
2135                Error::CircuitClosed => {}
2136                x => panic!("got other error: {}", x),
2137            }
2138        });
2139    }
2140
2141    #[traced_test]
2142    #[test]
2143    fn bad_extend_wrongtype() {
2144        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2145            let extended = relaymsg::Extended::new(vec![7; 200]).into();
2146            let cc = rmsg_to_ccmsg(None, extended);
2147
2148            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
2149            match error {
2150                Error::BytesErr {
2151                    err: tor_bytes::Error::InvalidMessage(_),
2152                    object: "extended2 message",
2153                } => {}
2154                other => panic!("{:?}", other),
2155            }
2156        });
2157    }
2158
2159    #[traced_test]
2160    #[test]
2161    fn bad_extend_destroy() {
2162        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2163            let cc = ClientCircChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
2164            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
2165            match error {
2166                Error::CircuitClosed => {}
2167                other => panic!("{:?}", other),
2168            }
2169        });
2170    }
2171
2172    #[traced_test]
2173    #[test]
2174    fn bad_extend_crypto() {
2175        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2176            let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
2177            let cc = rmsg_to_ccmsg(None, extended2);
2178            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
2179            assert!(matches!(error, Error::BadCircHandshakeAuth));
2180        });
2181    }
2182
2183    #[traced_test]
2184    #[test]
2185    fn begindir() {
2186        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2187            let (chan, mut rx, _sink) = working_fake_channel(&rt);
2188            let (circ, mut sink) = newcirc(&rt, chan).await;
2189            let circid = circ.peek_circid();
2190
2191            let begin_and_send_fut = async move {
2192                // Here we'll say we've got a circuit, and we want to
2193                // make a simple BEGINDIR request with it.
2194                let mut stream = circ.begin_dir_stream().await.unwrap();
2195                stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
2196                stream.flush().await.unwrap();
2197                let mut buf = [0_u8; 1024];
2198                let n = stream.read(&mut buf).await.unwrap();
2199                assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
2200                let n = stream.read(&mut buf).await.unwrap();
2201                assert_eq!(n, 0);
2202                stream
2203            };
2204            let reply_fut = async move {
2205                // We've disabled encryption on this circuit, so we can just
2206                // read the begindir cell.
2207                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2208                assert_eq!(id, Some(circid));
2209                let rmsg = match chmsg {
2210                    AnyChanMsg::Relay(r) => {
2211                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2212                            .unwrap()
2213                    }
2214                    other => panic!("{:?}", other),
2215                };
2216                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2217                assert!(matches!(rmsg, AnyRelayMsg::BeginDir(_)));
2218
2219                // Reply with a Connected cell to indicate success.
2220                let connected = relaymsg::Connected::new_empty().into();
2221                sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2222
2223                // Now read a DATA cell...
2224                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2225                assert_eq!(id, Some(circid));
2226                let rmsg = match chmsg {
2227                    AnyChanMsg::Relay(r) => {
2228                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2229                            .unwrap()
2230                    }
2231                    other => panic!("{:?}", other),
2232                };
2233                let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
2234                assert_eq!(streamid_2, streamid);
2235                if let AnyRelayMsg::Data(d) = rmsg {
2236                    assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
2237                } else {
2238                    panic!();
2239                }
2240
2241                // Write another data cell in reply!
2242                let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
2243                    .unwrap()
2244                    .into();
2245                sink.send(rmsg_to_ccmsg(streamid, data)).await.unwrap();
2246
2247                // Send an END cell to say that the conversation is over.
2248                let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
2249                sink.send(rmsg_to_ccmsg(streamid, end)).await.unwrap();
2250
2251                (rx, sink) // gotta keep these alive, or the reactor will exit.
2252            };
2253
2254            let (_stream, (_rx, _sink)) = futures::join!(begin_and_send_fut, reply_fut);
2255        });
2256    }
2257
2258    // Test: close a stream, either by dropping it or by calling AsyncWriteExt::close.
2259    fn close_stream_helper(by_drop: bool) {
2260        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2261            let (chan, mut rx, _sink) = working_fake_channel(&rt);
2262            let (circ, mut sink) = newcirc(&rt, chan).await;
2263
2264            let stream_fut = async move {
2265                let stream = circ
2266                    .begin_stream("www.example.com", 80, None)
2267                    .await
2268                    .unwrap();
2269
2270                let (r, mut w) = stream.split();
2271                if by_drop {
2272                    // Drop the writer and the reader, which should close the stream.
2273                    drop(r);
2274                    drop(w);
2275                    (None, circ) // make sure to keep the circuit alive
2276                } else {
2277                    // Call close on the writer, while keeping the reader alive.
2278                    w.close().await.unwrap();
2279                    (Some(r), circ)
2280                }
2281            };
2282            let handler_fut = async {
2283                // Read the BEGIN message.
2284                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2285                let rmsg = match msg {
2286                    AnyChanMsg::Relay(r) => {
2287                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2288                            .unwrap()
2289                    }
2290                    other => panic!("{:?}", other),
2291                };
2292                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2293                assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
2294
2295                // Reply with a CONNECTED.
2296                let connected =
2297                    relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
2298                sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2299
2300                // Expect an END.
2301                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2302                let rmsg = match msg {
2303                    AnyChanMsg::Relay(r) => {
2304                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2305                            .unwrap()
2306                    }
2307                    other => panic!("{:?}", other),
2308                };
2309                let (_, rmsg) = rmsg.into_streamid_and_msg();
2310                assert_eq!(rmsg.cmd(), RelayCmd::END);
2311
2312                (rx, sink) // keep these alive or the reactor will exit.
2313            };
2314
2315            let ((_opt_reader, _circ), (_rx, _sink)) = futures::join!(stream_fut, handler_fut);
2316        });
2317    }
2318
2319    #[traced_test]
2320    #[test]
2321    fn drop_stream() {
2322        close_stream_helper(true);
2323    }
2324
2325    #[traced_test]
2326    #[test]
2327    fn close_stream() {
2328        close_stream_helper(false);
2329    }
2330
2331    // Set up a circuit and stream that expects some incoming SENDMEs.
2332    async fn setup_incoming_sendme_case<R: Runtime>(
2333        rt: &R,
2334        n_to_send: usize,
2335    ) -> (
2336        Arc<ClientCirc>,
2337        DataStream,
2338        CircuitRxSender,
2339        Option<StreamId>,
2340        usize,
2341        Receiver<AnyChanCell>,
2342        Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
2343    ) {
2344        let (chan, mut rx, sink2) = working_fake_channel(rt);
2345        let (circ, mut sink) = newcirc(rt, chan).await;
2346        let circid = circ.peek_circid();
2347
2348        let begin_and_send_fut = {
2349            let circ = circ.clone();
2350            async move {
2351                // Take our circuit and make a stream on it.
2352                let mut stream = circ
2353                    .begin_stream("www.example.com", 443, None)
2354                    .await
2355                    .unwrap();
2356                let junk = [0_u8; 1024];
2357                let mut remaining = n_to_send;
2358                while remaining > 0 {
2359                    let n = std::cmp::min(remaining, junk.len());
2360                    stream.write_all(&junk[..n]).await.unwrap();
2361                    remaining -= n;
2362                }
2363                stream.flush().await.unwrap();
2364                stream
2365            }
2366        };
2367
2368        let receive_fut = async move {
2369            // Read the begin cell.
2370            let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2371            let rmsg = match chmsg {
2372                AnyChanMsg::Relay(r) => {
2373                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2374                        .unwrap()
2375                }
2376                other => panic!("{:?}", other),
2377            };
2378            let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2379            assert!(matches!(rmsg, AnyRelayMsg::Begin(_)));
2380            // Reply with a connected cell...
2381            let connected = relaymsg::Connected::new_empty().into();
2382            sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2383            // Now read bytes from the stream until we have them all.
2384            let mut bytes_received = 0_usize;
2385            let mut cells_received = 0_usize;
2386            while bytes_received < n_to_send {
2387                // Read a data cell, and remember how much we got.
2388                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2389                assert_eq!(id, Some(circid));
2390
2391                let rmsg = match chmsg {
2392                    AnyChanMsg::Relay(r) => {
2393                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2394                            .unwrap()
2395                    }
2396                    other => panic!("{:?}", other),
2397                };
2398                let (streamid2, rmsg) = rmsg.into_streamid_and_msg();
2399                assert_eq!(streamid2, streamid);
2400                if let AnyRelayMsg::Data(dat) = rmsg {
2401                    cells_received += 1;
2402                    bytes_received += dat.as_ref().len();
2403                } else {
2404                    panic!();
2405                }
2406            }
2407
2408            (sink, streamid, cells_received, rx)
2409        };
2410
2411        let (stream, (sink, streamid, cells_received, rx)) =
2412            futures::join!(begin_and_send_fut, receive_fut);
2413
2414        (circ, stream, sink, streamid, cells_received, rx, sink2)
2415    }
2416
2417    #[traced_test]
2418    #[test]
2419    fn accept_valid_sendme() {
2420        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2421            let (circ, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
2422                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2423
2424            assert_eq!(cells_received, 301);
2425
2426            // Make sure that the circuit is indeed expecting the right sendmes
2427            {
2428                let (tx, rx) = oneshot::channel();
2429                circ.command
2430                    .unbounded_send(CtrlCmd::QuerySendWindow {
2431                        hop: 2.into(),
2432                        leg: circ.unique_id(),
2433                        done: tx,
2434                    })
2435                    .unwrap();
2436                let (window, tags) = rx.await.unwrap().unwrap();
2437                assert_eq!(window, 1000 - 301);
2438                assert_eq!(tags.len(), 3);
2439                // 100
2440                assert_eq!(
2441                    tags[0],
2442                    SendmeTag::from(hex!("6400000000000000000000000000000000000000"))
2443                );
2444                // 200
2445                assert_eq!(
2446                    tags[1],
2447                    SendmeTag::from(hex!("c800000000000000000000000000000000000000"))
2448                );
2449                // 300
2450                assert_eq!(
2451                    tags[2],
2452                    SendmeTag::from(hex!("2c01000000000000000000000000000000000000"))
2453                );
2454            }
2455
2456            let reply_with_sendme_fut = async move {
2457                // make and send a circuit-level sendme.
2458                let c_sendme =
2459                    relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000"))
2460                        .into();
2461                sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
2462
2463                // Make and send a stream-level sendme.
2464                let s_sendme = relaymsg::Sendme::new_empty().into();
2465                sink.send(rmsg_to_ccmsg(streamid, s_sendme)).await.unwrap();
2466
2467                sink
2468            };
2469
2470            let _sink = reply_with_sendme_fut.await;
2471
2472            rt.advance_until_stalled().await;
2473
2474            // Now make sure that the circuit is still happy, and its
2475            // window is updated.
2476            {
2477                let (tx, rx) = oneshot::channel();
2478                circ.command
2479                    .unbounded_send(CtrlCmd::QuerySendWindow {
2480                        hop: 2.into(),
2481                        leg: circ.unique_id(),
2482                        done: tx,
2483                    })
2484                    .unwrap();
2485                let (window, _tags) = rx.await.unwrap().unwrap();
2486                assert_eq!(window, 1000 - 201);
2487            }
2488        });
2489    }
2490
2491    #[traced_test]
2492    #[test]
2493    fn invalid_circ_sendme() {
2494        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2495            // Same setup as accept_valid_sendme() test above but try giving
2496            // a sendme with the wrong tag.
2497
2498            let (circ, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
2499                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2500
2501            let reply_with_sendme_fut = async move {
2502                // make and send a circuit-level sendme with a bad tag.
2503                let c_sendme =
2504                    relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF"))
2505                        .into();
2506                sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
2507                sink
2508            };
2509
2510            let _sink = reply_with_sendme_fut.await;
2511
2512            // Check whether the reactor dies as a result of receiving invalid data.
2513            rt.advance_until_stalled().await;
2514            assert!(circ.is_closing());
2515        });
2516    }
2517
2518    #[traced_test]
2519    #[test]
2520    fn test_busy_stream_fairness() {
2521        // Number of streams to use.
2522        const N_STREAMS: usize = 3;
2523        // Number of cells (roughly) for each stream to send.
2524        const N_CELLS: usize = 20;
2525        // Number of bytes that *each* stream will send, and that we'll read
2526        // from the channel.
2527        const N_BYTES: usize = relaymsg::Data::MAXLEN_V0 * N_CELLS;
2528        // Ignoring cell granularity, with perfect fairness we'd expect
2529        // `N_BYTES/N_STREAMS` bytes from each stream.
2530        //
2531        // We currently allow for up to a full cell less than that.  This is
2532        // somewhat arbitrary and can be changed as needed, since we don't
2533        // provide any specific fairness guarantees.
2534        const MIN_EXPECTED_BYTES_PER_STREAM: usize =
2535            N_BYTES / N_STREAMS - relaymsg::Data::MAXLEN_V0;
2536
2537        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2538            let (chan, mut rx, _sink) = working_fake_channel(&rt);
2539            let (circ, mut sink) = newcirc(&rt, chan).await;
2540
2541            // Run clients in a single task, doing our own round-robin
2542            // scheduling of writes to the reactor. Conversely, if we were to
2543            // put each client in its own task, we would be at the the mercy of
2544            // how fairly the runtime schedules the client tasks, which is outside
2545            // the scope of this test.
2546            rt.spawn({
2547                // Clone the circuit to keep it alive after writers have
2548                // finished with it.
2549                let circ = circ.clone();
2550                async move {
2551                    let mut clients = VecDeque::new();
2552                    struct Client {
2553                        stream: DataStream,
2554                        to_write: &'static [u8],
2555                    }
2556                    for _ in 0..N_STREAMS {
2557                        clients.push_back(Client {
2558                            stream: circ
2559                                .begin_stream("www.example.com", 80, None)
2560                                .await
2561                                .unwrap(),
2562                            to_write: &[0_u8; N_BYTES][..],
2563                        });
2564                    }
2565                    while let Some(mut client) = clients.pop_front() {
2566                        if client.to_write.is_empty() {
2567                            // Client is done. Don't put back in queue.
2568                            continue;
2569                        }
2570                        let written = client.stream.write(client.to_write).await.unwrap();
2571                        client.to_write = &client.to_write[written..];
2572                        clients.push_back(client);
2573                    }
2574                }
2575            })
2576            .unwrap();
2577
2578            let channel_handler_fut = async {
2579                let mut stream_bytes_received = HashMap::<StreamId, usize>::new();
2580                let mut total_bytes_received = 0;
2581
2582                loop {
2583                    let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2584                    let rmsg = match msg {
2585                        AnyChanMsg::Relay(r) => AnyRelayMsgOuter::decode_singleton(
2586                            RelayCellFormat::V0,
2587                            r.into_relay_body(),
2588                        )
2589                        .unwrap(),
2590                        other => panic!("Unexpected chanmsg: {other:?}"),
2591                    };
2592                    let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2593                    match rmsg.cmd() {
2594                        RelayCmd::BEGIN => {
2595                            // Add an entry for this stream.
2596                            let prev = stream_bytes_received.insert(streamid.unwrap(), 0);
2597                            assert_eq!(prev, None);
2598                            // Reply with a CONNECTED.
2599                            let connected = relaymsg::Connected::new_with_addr(
2600                                "10.0.0.1".parse().unwrap(),
2601                                1234,
2602                            )
2603                            .into();
2604                            sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2605                        }
2606                        RelayCmd::DATA => {
2607                            let data_msg = relaymsg::Data::try_from(rmsg).unwrap();
2608                            let nbytes = data_msg.as_ref().len();
2609                            total_bytes_received += nbytes;
2610                            let streamid = streamid.unwrap();
2611                            let stream_bytes = stream_bytes_received.get_mut(&streamid).unwrap();
2612                            *stream_bytes += nbytes;
2613                            if total_bytes_received >= N_BYTES {
2614                                break;
2615                            }
2616                        }
2617                        RelayCmd::END => {
2618                            // Stream is done. If fair scheduling is working as
2619                            // expected we *probably* shouldn't get here, but we
2620                            // can ignore it and save the failure until we
2621                            // actually have the final stats.
2622                            continue;
2623                        }
2624                        other => {
2625                            panic!("Unexpected command {other:?}");
2626                        }
2627                    }
2628                }
2629
2630                // Return our stats, along with the `rx` and `sink` to keep the
2631                // reactor alive (since clients could still be writing).
2632                (total_bytes_received, stream_bytes_received, rx, sink)
2633            };
2634
2635            let (total_bytes_received, stream_bytes_received, _rx, _sink) =
2636                channel_handler_fut.await;
2637            assert_eq!(stream_bytes_received.len(), N_STREAMS);
2638            for (sid, stream_bytes) in stream_bytes_received {
2639                assert!(
2640                    stream_bytes >= MIN_EXPECTED_BYTES_PER_STREAM,
2641                    "Only {stream_bytes} of {total_bytes_received} bytes received from {N_STREAMS} came from {sid:?}; expected at least {MIN_EXPECTED_BYTES_PER_STREAM}"
2642                );
2643            }
2644        });
2645    }
2646
2647    #[test]
2648    fn basic_params() {
2649        use super::CircParameters;
2650        let mut p = CircParameters::default();
2651        assert!(p.extend_by_ed25519_id);
2652
2653        p.extend_by_ed25519_id = false;
2654        assert!(!p.extend_by_ed25519_id);
2655    }
2656
2657    #[cfg(feature = "hs-service")]
2658    struct AllowAllStreamsFilter;
2659    #[cfg(feature = "hs-service")]
2660    impl IncomingStreamRequestFilter for AllowAllStreamsFilter {
2661        fn disposition(
2662            &mut self,
2663            _ctx: &crate::stream::IncomingStreamRequestContext<'_>,
2664            _circ: &crate::tunnel::reactor::syncview::ClientCircSyncView<'_>,
2665        ) -> Result<crate::stream::IncomingStreamRequestDisposition> {
2666            Ok(crate::stream::IncomingStreamRequestDisposition::Accept)
2667        }
2668    }
2669
2670    #[traced_test]
2671    #[test]
2672    #[cfg(feature = "hs-service")]
2673    fn allow_stream_requests_twice() {
2674        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2675            let (chan, _rx, _sink) = working_fake_channel(&rt);
2676            let (circ, _send) = newcirc(&rt, chan).await;
2677
2678            let _incoming = circ
2679                .allow_stream_requests(
2680                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2681                    circ.last_hop().unwrap(),
2682                    AllowAllStreamsFilter,
2683                )
2684                .await
2685                .unwrap();
2686
2687            let incoming = circ
2688                .allow_stream_requests(
2689                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2690                    circ.last_hop().unwrap(),
2691                    AllowAllStreamsFilter,
2692                )
2693                .await;
2694
2695            // There can only be one IncomingStream at a time on any given circuit.
2696            assert!(incoming.is_err());
2697        });
2698    }
2699
2700    #[traced_test]
2701    #[test]
2702    #[cfg(feature = "hs-service")]
2703    fn allow_stream_requests() {
2704        use tor_cell::relaycell::msg::BeginFlags;
2705
2706        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2707            const TEST_DATA: &[u8] = b"ping";
2708
2709            let (chan, _rx, _sink) = working_fake_channel(&rt);
2710            let (circ, mut send) = newcirc(&rt, chan).await;
2711
2712            let rfmt = RelayCellFormat::V0;
2713
2714            // A helper channel for coordinating the "client"/"service" interaction
2715            let (tx, rx) = oneshot::channel();
2716            let mut incoming = circ
2717                .allow_stream_requests(
2718                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2719                    circ.last_hop().unwrap(),
2720                    AllowAllStreamsFilter,
2721                )
2722                .await
2723                .unwrap();
2724
2725            let simulate_service = async move {
2726                let stream = incoming.next().await.unwrap();
2727                let mut data_stream = stream
2728                    .accept_data(relaymsg::Connected::new_empty())
2729                    .await
2730                    .unwrap();
2731                // Notify the client task we're ready to accept DATA cells
2732                tx.send(()).unwrap();
2733
2734                // Read the data the client sent us
2735                let mut buf = [0_u8; TEST_DATA.len()];
2736                data_stream.read_exact(&mut buf).await.unwrap();
2737                assert_eq!(&buf, TEST_DATA);
2738
2739                circ
2740            };
2741
2742            let simulate_client = async move {
2743                let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2744                let body: BoxedCellBody =
2745                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2746                        .encode(rfmt, &mut testing_rng())
2747                        .unwrap();
2748                let begin_msg = chanmsg::Relay::from(body);
2749
2750                // Pretend to be a client at the other end of the circuit sending a begin cell
2751                send.send(ClientCircChanMsg::Relay(begin_msg))
2752                    .await
2753                    .unwrap();
2754
2755                // Wait until the service is ready to accept data
2756                // TODO: we shouldn't need to wait! This is needed because the service will reject
2757                // any DATA cells that aren't associated with a known stream. We need to wait until
2758                // the service receives our BEGIN cell (and the reactor updates hop.map with the
2759                // new stream).
2760                rx.await.unwrap();
2761                // Now send some data along the newly established circuit..
2762                let data = relaymsg::Data::new(TEST_DATA).unwrap();
2763                let body: BoxedCellBody =
2764                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2765                        .encode(rfmt, &mut testing_rng())
2766                        .unwrap();
2767                let data_msg = chanmsg::Relay::from(body);
2768
2769                send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
2770                send
2771            };
2772
2773            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2774        });
2775    }
2776
2777    #[traced_test]
2778    #[test]
2779    #[cfg(feature = "hs-service")]
2780    fn accept_stream_after_reject() {
2781        use tor_cell::relaycell::msg::BeginFlags;
2782        use tor_cell::relaycell::msg::EndReason;
2783
2784        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2785            const TEST_DATA: &[u8] = b"ping";
2786            const STREAM_COUNT: usize = 2;
2787            let rfmt = RelayCellFormat::V0;
2788
2789            let (chan, _rx, _sink) = working_fake_channel(&rt);
2790            let (circ, mut send) = newcirc(&rt, chan).await;
2791
2792            // A helper channel for coordinating the "client"/"service" interaction
2793            let (mut tx, mut rx) = mpsc::channel(STREAM_COUNT);
2794
2795            let mut incoming = circ
2796                .allow_stream_requests(
2797                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2798                    circ.last_hop().unwrap(),
2799                    AllowAllStreamsFilter,
2800                )
2801                .await
2802                .unwrap();
2803
2804            let simulate_service = async move {
2805                // Process 2 incoming streams
2806                for i in 0..STREAM_COUNT {
2807                    let stream = incoming.next().await.unwrap();
2808
2809                    // Reject the first one
2810                    if i == 0 {
2811                        stream
2812                            .reject(relaymsg::End::new_with_reason(EndReason::INTERNAL))
2813                            .await
2814                            .unwrap();
2815                        // Notify the client
2816                        tx.send(()).await.unwrap();
2817                        continue;
2818                    }
2819
2820                    let mut data_stream = stream
2821                        .accept_data(relaymsg::Connected::new_empty())
2822                        .await
2823                        .unwrap();
2824                    // Notify the client task we're ready to accept DATA cells
2825                    tx.send(()).await.unwrap();
2826
2827                    // Read the data the client sent us
2828                    let mut buf = [0_u8; TEST_DATA.len()];
2829                    data_stream.read_exact(&mut buf).await.unwrap();
2830                    assert_eq!(&buf, TEST_DATA);
2831                }
2832
2833                circ
2834            };
2835
2836            let simulate_client = async move {
2837                let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2838                let body: BoxedCellBody =
2839                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2840                        .encode(rfmt, &mut testing_rng())
2841                        .unwrap();
2842                let begin_msg = chanmsg::Relay::from(body);
2843
2844                // Pretend to be a client at the other end of the circuit sending 2 identical begin
2845                // cells (the first one will be rejected by the test service).
2846                for _ in 0..STREAM_COUNT {
2847                    send.send(ClientCircChanMsg::Relay(begin_msg.clone()))
2848                        .await
2849                        .unwrap();
2850
2851                    // Wait until the service rejects our request
2852                    rx.next().await.unwrap();
2853                }
2854
2855                // Now send some data along the newly established circuit..
2856                let data = relaymsg::Data::new(TEST_DATA).unwrap();
2857                let body: BoxedCellBody =
2858                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2859                        .encode(rfmt, &mut testing_rng())
2860                        .unwrap();
2861                let data_msg = chanmsg::Relay::from(body);
2862
2863                send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
2864                send
2865            };
2866
2867            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2868        });
2869    }
2870
2871    #[traced_test]
2872    #[test]
2873    #[cfg(feature = "hs-service")]
2874    fn incoming_stream_bad_hop() {
2875        use tor_cell::relaycell::msg::BeginFlags;
2876
2877        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2878            /// Expect the originator of the BEGIN cell to be hop 1.
2879            const EXPECTED_HOP: u8 = 1;
2880            let rfmt = RelayCellFormat::V0;
2881
2882            let (chan, _rx, _sink) = working_fake_channel(&rt);
2883            let (circ, mut send) = newcirc(&rt, chan).await;
2884
2885            // Expect to receive incoming streams from hop EXPECTED_HOP
2886            let mut incoming = circ
2887                .allow_stream_requests(
2888                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2889                    (circ.unique_id(), EXPECTED_HOP.into()).into(),
2890                    AllowAllStreamsFilter,
2891                )
2892                .await
2893                .unwrap();
2894
2895            let simulate_service = async move {
2896                // The originator of the cell is actually the last hop on the circuit, not hop 1,
2897                // so we expect the reactor to shut down.
2898                assert!(incoming.next().await.is_none());
2899                circ
2900            };
2901
2902            let simulate_client = async move {
2903                let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2904                let body: BoxedCellBody =
2905                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2906                        .encode(rfmt, &mut testing_rng())
2907                        .unwrap();
2908                let begin_msg = chanmsg::Relay::from(body);
2909
2910                // Pretend to be a client at the other end of the circuit sending a begin cell
2911                send.send(ClientCircChanMsg::Relay(begin_msg))
2912                    .await
2913                    .unwrap();
2914
2915                send
2916            };
2917
2918            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2919        });
2920    }
2921
2922    #[traced_test]
2923    #[test]
2924    #[cfg(feature = "conflux")]
2925    fn multipath_circ_validation() {
2926        use std::error::Error as _;
2927
2928        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2929            let params = CircParameters::default();
2930            let invalid_tunnels = [
2931                setup_bad_conflux_tunnel(&rt).await,
2932                setup_conflux_tunnel(&rt, true, params).await,
2933            ];
2934
2935            for tunnel in invalid_tunnels {
2936                let TestTunnelCtx {
2937                    tunnel: _tunnel,
2938                    circs: _circs,
2939                    conflux_link_rx,
2940                } = tunnel;
2941
2942                let conflux_hs_err = conflux_link_rx.await.unwrap().unwrap_err();
2943                let err_src = conflux_hs_err.source().unwrap();
2944
2945                // The two circuits don't end in the same hop (no join point),
2946                // so the reactor will refuse to link them
2947                assert!(err_src
2948                    .to_string()
2949                    .contains("one more more conflux circuits are invalid"));
2950            }
2951        });
2952    }
2953
2954    // TODO: this structure could be reused for the other tests,
2955    // to address nickm's comment:
2956    // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3005#note_3202362
2957    #[derive(Debug)]
2958    #[allow(unused)]
2959    #[cfg(feature = "conflux")]
2960    struct TestCircuitCtx {
2961        chan_rx: Receiver<AnyChanCell>,
2962        chan_tx: Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
2963        circ_tx: CircuitRxSender,
2964        unique_id: UniqId,
2965    }
2966
2967    #[derive(Debug)]
2968    #[cfg(feature = "conflux")]
2969    struct TestTunnelCtx {
2970        tunnel: Arc<ClientCirc>,
2971        circs: Vec<TestCircuitCtx>,
2972        conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
2973    }
2974
2975    /// Wait for a LINK cell to arrive on the specified channel and return its payload.
2976    #[cfg(feature = "conflux")]
2977    async fn await_link_payload(rx: &mut Receiver<AnyChanCell>) -> ConfluxLink {
2978        // Wait for the LINK cell...
2979        let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2980        let rmsg = match chmsg {
2981            AnyChanMsg::Relay(r) => {
2982                AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2983                    .unwrap()
2984            }
2985            other => panic!("{:?}", other),
2986        };
2987        let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2988
2989        let link = match rmsg {
2990            AnyRelayMsg::ConfluxLink(link) => link,
2991            _ => panic!("unexpected relay message {rmsg:?}"),
2992        };
2993
2994        assert!(streamid.is_none());
2995
2996        link
2997    }
2998
2999    #[cfg(feature = "conflux")]
3000    async fn setup_conflux_tunnel(
3001        rt: &MockRuntime,
3002        same_hops: bool,
3003        params: CircParameters,
3004    ) -> TestTunnelCtx {
3005        let hops1 = hop_details(3, 0);
3006        let hops2 = if same_hops {
3007            hops1.clone()
3008        } else {
3009            hop_details(3, 10)
3010        };
3011
3012        let (chan1, rx1, chan_sink1) = working_fake_channel(rt);
3013        let (circ1, sink1) = newcirc_ext(
3014            rt,
3015            UniqId::new(1, 3),
3016            chan1,
3017            hops1,
3018            2.into(),
3019            params.clone(),
3020        )
3021        .await;
3022
3023        let (chan2, rx2, chan_sink2) = working_fake_channel(rt);
3024
3025        let (circ2, sink2) =
3026            newcirc_ext(rt, UniqId::new(2, 4), chan2, hops2, 2.into(), params).await;
3027
3028        let (answer_tx, answer_rx) = oneshot::channel();
3029        circ2
3030            .command
3031            .unbounded_send(CtrlCmd::ShutdownAndReturnCircuit { answer: answer_tx })
3032            .unwrap();
3033
3034        let circuit = answer_rx.await.unwrap().unwrap();
3035        // The circuit should be shutting down its reactor
3036        rt.advance_until_stalled().await;
3037        assert!(circ2.is_closing());
3038
3039        let (conflux_link_tx, conflux_link_rx) = oneshot::channel();
3040        // Tell the first circuit to link with the second and form a multipath tunnel
3041        circ1
3042            .control
3043            .unbounded_send(CtrlMsg::LinkCircuits {
3044                circuits: vec![circuit],
3045                answer: conflux_link_tx,
3046            })
3047            .unwrap();
3048
3049        let circ_ctx1 = TestCircuitCtx {
3050            chan_rx: rx1,
3051            chan_tx: chan_sink1,
3052            circ_tx: sink1,
3053            unique_id: circ1.unique_id(),
3054        };
3055
3056        let circ_ctx2 = TestCircuitCtx {
3057            chan_rx: rx2,
3058            chan_tx: chan_sink2,
3059            circ_tx: sink2,
3060            unique_id: circ2.unique_id(),
3061        };
3062
3063        TestTunnelCtx {
3064            tunnel: circ1,
3065            circs: vec![circ_ctx1, circ_ctx2],
3066            conflux_link_rx,
3067        }
3068    }
3069
3070    #[cfg(feature = "conflux")]
3071    async fn setup_good_conflux_tunnel(rt: &MockRuntime) -> TestTunnelCtx {
3072        // Our 2 test circuits are identical, so they both have the same guards,
3073        // which technically violates the conflux set rule mentioned in prop354.
3074        // For testing purposes this is fine, but in production we'll need to ensure
3075        // the calling code prevents guard reuse (except in the case where
3076        // one of the guards happens to be Guard + Exit)
3077        let same_hops = true;
3078        let params = CircParameters::new(true, build_cc_vegas_params());
3079        setup_conflux_tunnel(rt, same_hops, params).await
3080    }
3081
3082    #[cfg(feature = "conflux")]
3083    async fn setup_bad_conflux_tunnel(rt: &MockRuntime) -> TestTunnelCtx {
3084        // The two circuits don't share any hops,
3085        // so they won't end in the same hop (no join point),
3086        // causing the reactor to refuse to link them.
3087        let same_hops = false;
3088        let params = CircParameters::new(true, build_cc_vegas_params());
3089        setup_conflux_tunnel(rt, same_hops, params).await
3090    }
3091
3092    #[traced_test]
3093    #[test]
3094    #[cfg(feature = "conflux")]
3095    fn reject_conflux_linked_before_hs() {
3096        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3097            let (chan, mut _rx, _sink) = working_fake_channel(&rt);
3098            let (circ, mut sink) = newcirc(&rt, chan).await;
3099
3100            let nonce = V1Nonce::new(&mut testing_rng());
3101            let payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
3102            // Send a LINKED cell
3103            let linked = relaymsg::ConfluxLinked::new(payload).into();
3104            sink.send(rmsg_to_ccmsg(None, linked)).await.unwrap();
3105
3106            rt.advance_until_stalled().await;
3107            assert!(circ.is_closing());
3108        });
3109    }
3110
3111    #[traced_test]
3112    #[test]
3113    #[cfg(feature = "conflux")]
3114    fn conflux_hs_timeout() {
3115        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3116            let TestTunnelCtx {
3117                tunnel: _tunnel,
3118                circs,
3119                conflux_link_rx,
3120            } = setup_good_conflux_tunnel(&rt).await;
3121
3122            let [mut circ1, _circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3123
3124            // Wait for the LINK cell
3125            let link = await_link_payload(&mut circ1.chan_rx).await;
3126
3127            // Send a LINK cell on the first leg...
3128            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3129            circ1
3130                .circ_tx
3131                .send(rmsg_to_ccmsg(None, linked))
3132                .await
3133                .unwrap();
3134
3135            // Do nothing, and wait for the handshake to timeout on the second leg
3136            rt.advance_by(Duration::from_secs(60)).await;
3137
3138            let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3139
3140            // Get the handshake results of each circuit
3141            let [res1, res2]: [StdResult<(), ConfluxHandshakeError>; 2] =
3142                conflux_hs_res.try_into().unwrap();
3143
3144            assert!(res1.is_ok());
3145
3146            let err = res2.unwrap_err();
3147            assert!(matches!(err, ConfluxHandshakeError::Timeout), "{err:?}");
3148        });
3149    }
3150
3151    #[traced_test]
3152    #[test]
3153    #[cfg(feature = "conflux")]
3154    fn conflux_bad_hs() {
3155        use crate::util::err::ConfluxHandshakeError;
3156
3157        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3158            let nonce = V1Nonce::new(&mut testing_rng());
3159            let bad_link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
3160            //let extended2 = relaymsg::Extended2::new(vec![]).into();
3161            let bad_hs_responses = [
3162                (
3163                    rmsg_to_ccmsg(
3164                        None,
3165                        relaymsg::ConfluxLinked::new(bad_link_payload.clone()).into(),
3166                    ),
3167                    "Received CONFLUX_LINKED cell with mismatched nonce",
3168                ),
3169                (
3170                    rmsg_to_ccmsg(None, relaymsg::ConfluxLink::new(bad_link_payload).into()),
3171                    "Unexpected CONFLUX_LINK cell from hop #3 on client circuit",
3172                ),
3173                (
3174                    rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into()),
3175                    "Received CONFLUX_SWITCH on unlinked circuit?!",
3176                ),
3177                // TODO: this currently causes the reactor to shut down immediately,
3178                // without sending a response on the handshake channel
3179                /*
3180                (
3181                    rmsg_to_ccmsg(None, extended2),
3182                    "Received CONFLUX_LINKED cell with mismatched nonce",
3183                ),
3184                */
3185            ];
3186
3187            for (bad_cell, expected_err) in bad_hs_responses {
3188                let TestTunnelCtx {
3189                    tunnel,
3190                    circs,
3191                    conflux_link_rx,
3192                } = setup_good_conflux_tunnel(&rt).await;
3193
3194                let [mut _circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3195
3196                // Respond with a bogus cell on one of the legs
3197                circ2.circ_tx.send(bad_cell).await.unwrap();
3198
3199                let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3200                // Get the handshake results (the handshake results are reported early,
3201                // without waiting for the second circuit leg's handshake to timeout,
3202                // because this is a protocol violation causing the entire tunnel to shut down)
3203                let [res2]: [StdResult<(), ConfluxHandshakeError>; 1] =
3204                    conflux_hs_res.try_into().unwrap();
3205
3206                match res2.unwrap_err() {
3207                    ConfluxHandshakeError::Link(Error::CircProto(e)) => {
3208                        assert_eq!(e, expected_err);
3209                    }
3210                    e => panic!("unexpected error: {e:?}"),
3211                }
3212
3213                assert!(tunnel.is_closing());
3214            }
3215        });
3216    }
3217
3218    #[traced_test]
3219    #[test]
3220    #[cfg(feature = "conflux")]
3221    fn unexpected_conflux_cell() {
3222        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3223            let nonce = V1Nonce::new(&mut testing_rng());
3224            let link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
3225            let bad_cells = [
3226                rmsg_to_ccmsg(
3227                    None,
3228                    relaymsg::ConfluxLinked::new(link_payload.clone()).into(),
3229                ),
3230                rmsg_to_ccmsg(
3231                    None,
3232                    relaymsg::ConfluxLink::new(link_payload.clone()).into(),
3233                ),
3234                rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into()),
3235            ];
3236
3237            for bad_cell in bad_cells {
3238                let (chan, mut _rx, _sink) = working_fake_channel(&rt);
3239                let (circ, mut sink) = newcirc(&rt, chan).await;
3240
3241                sink.send(bad_cell).await.unwrap();
3242                rt.advance_until_stalled().await;
3243
3244                // Note: unfortunately we can't assert the circuit is
3245                // closing for the reason, because the reactor just logs
3246                // the error and then exits.
3247                assert!(circ.is_closing());
3248            }
3249        });
3250    }
3251
3252    #[traced_test]
3253    #[test]
3254    #[cfg(feature = "conflux")]
3255    fn conflux_bad_linked() {
3256        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3257            let TestTunnelCtx {
3258                tunnel,
3259                circs,
3260                conflux_link_rx: _,
3261            } = setup_good_conflux_tunnel(&rt).await;
3262
3263            let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3264
3265            let link = await_link_payload(&mut circ1.chan_rx).await;
3266
3267            // Send a LINK cell on the first leg...
3268            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3269            circ1
3270                .circ_tx
3271                .send(rmsg_to_ccmsg(None, linked))
3272                .await
3273                .unwrap();
3274
3275            // ...and two LINKED cells on the second
3276            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3277            circ2
3278                .circ_tx
3279                .send(rmsg_to_ccmsg(None, linked))
3280                .await
3281                .unwrap();
3282            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3283            circ2
3284                .circ_tx
3285                .send(rmsg_to_ccmsg(None, linked))
3286                .await
3287                .unwrap();
3288
3289            rt.advance_until_stalled().await;
3290
3291            // Receiving a LINKED cell on an already linked leg causes
3292            // the tunnel to be torn down
3293            assert!(tunnel.is_closing());
3294        });
3295    }
3296
3297    #[traced_test]
3298    #[test]
3299    #[cfg(feature = "conflux")]
3300    fn conflux_bad_switch() {
3301        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3302            let bad_switch = [
3303                // SWITCH cells with seqno = 0 are not allowed
3304                relaymsg::ConfluxSwitch::new(0),
3305                // TODO(#2031): from c-tor:
3306                //
3307                // We have to make sure that the switch command is truly
3308                // incrementing the sequence number, or else it becomes
3309                // a side channel that can be spammed for traffic analysis.
3310                //
3311                // We should figure out what this check is supposed to look like,
3312                // and have a test for it
3313            ];
3314
3315            for bad_cell in bad_switch {
3316                let TestTunnelCtx {
3317                    tunnel,
3318                    circs,
3319                    conflux_link_rx,
3320                } = setup_good_conflux_tunnel(&rt).await;
3321
3322                let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3323
3324                let link = await_link_payload(&mut circ1.chan_rx).await;
3325
3326                // Send a LINKED cell on both legs
3327                for circ in [&mut circ1, &mut circ2] {
3328                    let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3329                    circ.circ_tx
3330                        .send(rmsg_to_ccmsg(None, linked))
3331                        .await
3332                        .unwrap();
3333                }
3334
3335                let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3336                assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
3337
3338                // Now send a bad SWITCH cell on *both* legs.
3339                // This will cause both legs to be removed from the conflux set,
3340                // which causes the tunnel reactor to shut down
3341                for circ in [&mut circ1, &mut circ2] {
3342                    let msg = rmsg_to_ccmsg(None, bad_cell.clone().into());
3343                    circ.circ_tx.send(msg).await.unwrap();
3344                }
3345
3346                // The tunnel should be shutting down
3347                rt.advance_until_stalled().await;
3348                assert!(tunnel.is_closing());
3349            }
3350        });
3351    }
3352
3353    // TODO(conflux): add a test for SWITCH handling
3354
3355    /// Run a conflux test endpoint.
3356    #[cfg(feature = "conflux")]
3357    #[derive(Debug)]
3358    enum ConfluxTestEndpoint<I: Iterator<Item = Option<Duration>>> {
3359        /// Pretend to be an exit relay.
3360        Relay(ConfluxExitState<I>),
3361        /// Client task.
3362        Client {
3363            /// Channel for receiving the outcome of the conflux handshakes.
3364            conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3365            /// The tunnel reactor handle
3366            tunnel: Arc<ClientCirc>,
3367            /// Data to send on a stream.
3368            send_data: Vec<u8>,
3369            /// Data we expect to receive on a stream.
3370            recv_data: Vec<u8>,
3371        },
3372    }
3373
3374    /// Structure for returning the sinks, channels, etc. that must stay
3375    /// alive until the test is complete.
3376    #[allow(unused, clippy::large_enum_variant)]
3377    #[derive(Debug)]
3378    #[cfg(feature = "conflux")]
3379    enum ConfluxEndpointResult {
3380        Circuit {
3381            tunnel: Arc<ClientCirc>,
3382            stream: DataStream,
3383        },
3384        Relay {
3385            circ: TestCircuitCtx,
3386        },
3387    }
3388
3389    /// Stream data, shared by all the mock exit endpoints.
3390    #[derive(Debug)]
3391    #[cfg(feature = "conflux")]
3392    struct ConfluxStreamState {
3393        /// The data received so far on this stream.
3394        data_recvd: Vec<u8>,
3395        /// The total amount of data we expect to receive on this stream.
3396        expected_data_len: usize,
3397        /// Whether we have seen a BEGIN cell yet.
3398        begin_recvd: bool,
3399        /// Whether we have seen an END cell yet.
3400        end_recvd: bool,
3401        /// Whether we have sent an END cell yet.
3402        end_sent: bool,
3403    }
3404
3405    /// An object describing a SWITCH cell that we expect to receive
3406    /// in the mock exit
3407    #[derive(Debug)]
3408    #[cfg(feature = "conflux")]
3409    struct ExpectedSwitch {
3410        /// The number of cells we've seen on this leg so far,
3411        /// up to and including the SWITCH.
3412        cells_so_far: usize,
3413        /// The expected seqno in SWITCH cell,
3414        seqno: u32,
3415    }
3416
3417    /// The state of a mock exit.
3418    #[derive(Debug)]
3419    #[cfg(feature = "conflux")]
3420    struct ConfluxExitState<I: Iterator<Item = Option<Duration>>> {
3421        /// The runtime.
3422        runtime: Arc<AsyncMutex<MockRuntime>>,
3423        /// The client view of the tunnel.
3424        tunnel: Arc<ClientCirc>,
3425        /// The circuit test context.
3426        circ: TestCircuitCtx,
3427        /// The RTT delay to introduce just before each SENDME.
3428        ///
3429        /// Used to trigger the client to send a SWITCH.
3430        rtt_delays: I,
3431        /// State of the (only) expected stream on this tunnel,
3432        /// shared by all the mock exit endpoints.
3433        stream_state: Arc<Mutex<ConfluxStreamState>>,
3434        /// The number of cells after which to expect a SWITCH
3435        /// cell from the client.
3436        expect_switch: Vec<ExpectedSwitch>,
3437        /// Channel for receiving completion notifications from the other leg.
3438        done_rx: oneshot::Receiver<()>,
3439        /// Channel for sending completion notifications to the other leg.
3440        done_tx: oneshot::Sender<()>,
3441        /// Whether this circuit leg should act as the primary (sending) leg.
3442        is_sending_leg: bool,
3443    }
3444
3445    #[cfg(feature = "conflux")]
3446    async fn good_exit_handshake(
3447        runtime: &Arc<AsyncMutex<MockRuntime>>,
3448        init_rtt_delay: Option<Duration>,
3449        rx: &mut Receiver<ChanCell<AnyChanMsg>>,
3450        sink: &mut CircuitRxSender,
3451    ) {
3452        // Wait for the LINK cell
3453        let link = await_link_payload(rx).await;
3454
3455        // Introduce an artificial delay, to make one circ have a better initial RTT
3456        // than the other
3457        if let Some(init_rtt_delay) = init_rtt_delay {
3458            runtime.lock().await.advance_by(init_rtt_delay).await;
3459        }
3460
3461        // Reply with a LINKED cell...
3462        let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3463        sink.send(rmsg_to_ccmsg(None, linked)).await.unwrap();
3464
3465        // Wait for the client to respond with LINKED_ACK...
3466        let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
3467        let rmsg = match chmsg {
3468            AnyChanMsg::Relay(r) => {
3469                AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3470                    .unwrap()
3471            }
3472            other => panic!("{other:?}"),
3473        };
3474        let (_streamid, rmsg) = rmsg.into_streamid_and_msg();
3475
3476        assert!(matches!(rmsg, AnyRelayMsg::ConfluxLinkedAck(_)));
3477    }
3478
3479    #[cfg(feature = "conflux")]
3480    async fn run_mock_conflux_exit<I: Iterator<Item = Option<Duration>>>(
3481        state: ConfluxExitState<I>,
3482    ) -> ConfluxEndpointResult {
3483        let ConfluxExitState {
3484            runtime,
3485            tunnel,
3486            mut circ,
3487            rtt_delays,
3488            stream_state,
3489            mut expect_switch,
3490            done_tx,
3491            mut done_rx,
3492            is_sending_leg,
3493        } = state;
3494
3495        let mut rtt_delays = rtt_delays.into_iter();
3496
3497        // Expect the client to open a stream, and de-multiplex the received stream data
3498        let stream_len = stream_state.lock().unwrap().expected_data_len;
3499        let mut data_cells_received = 0_usize;
3500        let mut cell_count = 0_usize;
3501        let mut tags = vec![];
3502        let mut streamid = None;
3503
3504        while stream_state.lock().unwrap().data_recvd.len() < stream_len {
3505            use futures::select;
3506
3507            // Wait for the BEGIN cell to arrive, or for the transfer to complete
3508            // (we need to bail if the other leg already completed);
3509            let res = select! {
3510                res = circ.chan_rx.next() => {
3511                    res.unwrap()
3512                },
3513                res = done_rx => {
3514                    res.unwrap();
3515                    break;
3516                }
3517            };
3518
3519            let (_id, chmsg) = res.into_circid_and_msg();
3520            cell_count += 1;
3521            let rmsg = match chmsg {
3522                AnyChanMsg::Relay(r) => {
3523                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3524                        .unwrap()
3525                }
3526                other => panic!("{:?}", other),
3527            };
3528            let (new_streamid, rmsg) = rmsg.into_streamid_and_msg();
3529            if streamid.is_none() {
3530                streamid = new_streamid;
3531            }
3532
3533            let begin_recvd = stream_state.lock().unwrap().begin_recvd;
3534            let end_recvd = stream_state.lock().unwrap().end_recvd;
3535            match rmsg {
3536                AnyRelayMsg::Begin(_) if begin_recvd => {
3537                    panic!("client tried to open two streams?!");
3538                }
3539                AnyRelayMsg::Begin(_) if !begin_recvd => {
3540                    stream_state.lock().unwrap().begin_recvd = true;
3541                    // Reply with a connected cell...
3542                    let connected = relaymsg::Connected::new_empty().into();
3543                    circ.circ_tx
3544                        .send(rmsg_to_ccmsg(streamid, connected))
3545                        .await
3546                        .unwrap();
3547                }
3548                AnyRelayMsg::End(_) if !end_recvd => {
3549                    stream_state.lock().unwrap().end_recvd = true;
3550                    break;
3551                }
3552                AnyRelayMsg::End(_) if end_recvd => {
3553                    panic!("received two END cells for the same stream?!");
3554                }
3555                AnyRelayMsg::ConfluxSwitch(cell) => {
3556                    // Ensure we got the SWITCH after the expected number of cells
3557                    let expected = expect_switch.remove(0);
3558
3559                    assert_eq!(expected.cells_so_far, cell_count);
3560                    assert_eq!(expected.seqno, cell.seqno());
3561
3562                    // To keep the tests simple, we don't handle out of order cells,
3563                    // and simply sort the received data at the end.
3564                    // This ensures all the data was actually received,
3565                    // but it doesn't actually test that the SWITCH cells
3566                    // contain the appropriate seqnos.
3567                    continue;
3568                }
3569                AnyRelayMsg::Data(dat) => {
3570                    data_cells_received += 1;
3571                    stream_state
3572                        .lock()
3573                        .unwrap()
3574                        .data_recvd
3575                        .extend_from_slice(dat.as_ref());
3576
3577                    let is_next_cell_sendme = data_cells_received % 31 == 0;
3578                    if is_next_cell_sendme {
3579                        if tags.is_empty() {
3580                            // Important: we need to make sure all the SENDMEs
3581                            // we sent so far have been processed by the reactor
3582                            // (otherwise the next QuerySendWindow call
3583                            // might return an outdated list of tags!)
3584                            runtime.lock().await.advance_until_stalled().await;
3585                            let (tx, rx) = oneshot::channel();
3586                            tunnel
3587                                .command
3588                                .unbounded_send(CtrlCmd::QuerySendWindow {
3589                                    hop: 2.into(),
3590                                    leg: circ.unique_id,
3591                                    done: tx,
3592                                })
3593                                .unwrap();
3594
3595                            // Get a fresh batch of tags.
3596                            let (_window, new_tags) = rx.await.unwrap().unwrap();
3597                            tags = new_tags;
3598                        }
3599
3600                        let tag = tags.remove(0);
3601
3602                        // Introduce an artificial delay, to make one circ have worse RTT
3603                        // than the other, and thus trigger a SWITCH
3604                        if let Some(rtt_delay) = rtt_delays.next().flatten() {
3605                            runtime.lock().await.advance_by(rtt_delay).await;
3606                        }
3607                        // Make and send a circuit-level SENDME
3608                        let sendme = relaymsg::Sendme::from(tag).into();
3609
3610                        circ.circ_tx
3611                            .send(rmsg_to_ccmsg(None, sendme))
3612                            .await
3613                            .unwrap();
3614                    }
3615                }
3616                _ => panic!("unexpected message {rmsg:?} on leg {}", circ.unique_id),
3617            }
3618        }
3619
3620        let end_recvd = stream_state.lock().unwrap().end_recvd;
3621
3622        // Close the stream if the other endpoint hasn't already done so
3623        if is_sending_leg && !end_recvd {
3624            let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
3625            circ.circ_tx
3626                .send(rmsg_to_ccmsg(streamid, end))
3627                .await
3628                .unwrap();
3629            stream_state.lock().unwrap().end_sent = true;
3630        }
3631
3632        // This is allowed to fail, because the other leg might have exited first.
3633        let _ = done_tx.send(());
3634
3635        // Ensure we received all the switch cells we were expecting
3636        assert!(
3637            expect_switch.is_empty(),
3638            "expect_switch = {expect_switch:?}"
3639        );
3640
3641        ConfluxEndpointResult::Relay { circ }
3642    }
3643
3644    #[cfg(feature = "conflux")]
3645    async fn run_conflux_client(
3646        tunnel: Arc<ClientCirc>,
3647        conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3648        send_data: Vec<u8>,
3649        recv_data: Vec<u8>,
3650    ) -> ConfluxEndpointResult {
3651        let res = conflux_link_rx.await;
3652
3653        let res = res.unwrap().unwrap();
3654        assert_eq!(res.len(), 2);
3655
3656        // All circuit legs have completed the conflux handshake,
3657        // so we now have a multipath tunnel
3658
3659        // Now we're ready to open a stream
3660        let mut stream = tunnel
3661            .begin_stream("www.example.com", 443, None)
3662            .await
3663            .unwrap();
3664
3665        stream.write_all(&send_data).await.unwrap();
3666        stream.flush().await.unwrap();
3667
3668        let mut recv: Vec<u8> = Vec::new();
3669        let recv_len = stream.read_to_end(&mut recv).await.unwrap();
3670        assert_eq!(recv_len, recv_data.len());
3671        assert_eq!(recv_data, recv);
3672
3673        ConfluxEndpointResult::Circuit { tunnel, stream }
3674    }
3675
3676    #[cfg(feature = "conflux")]
3677    async fn run_conflux_endpoint<I: Iterator<Item = Option<Duration>>>(
3678        endpoint: ConfluxTestEndpoint<I>,
3679    ) -> ConfluxEndpointResult {
3680        match endpoint {
3681            ConfluxTestEndpoint::Relay(state) => run_mock_conflux_exit(state).await,
3682            ConfluxTestEndpoint::Client {
3683                tunnel,
3684                conflux_link_rx,
3685                send_data,
3686                recv_data,
3687            } => run_conflux_client(tunnel, conflux_link_rx, send_data, recv_data).await,
3688        }
3689    }
3690
3691    #[traced_test]
3692    #[test]
3693    #[cfg(feature = "conflux")]
3694    fn multipath_stream() {
3695        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3696            /// The number of data cells to send.
3697            const NUM_CELLS: usize = 300;
3698            /// 498 bytes per DATA cell.
3699            const CELL_SIZE: usize = 498;
3700
3701            let TestTunnelCtx {
3702                tunnel,
3703                circs,
3704                conflux_link_rx,
3705            } = setup_good_conflux_tunnel(&rt).await;
3706            let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3707
3708            // The stream data we're going to send over the conflux tunnel
3709            let mut send_data = (0..255_u8)
3710                .cycle()
3711                .take(NUM_CELLS * CELL_SIZE)
3712                .collect::<Vec<_>>();
3713            let stream_state = Arc::new(Mutex::new(ConfluxStreamState {
3714                data_recvd: vec![],
3715                expected_data_len: send_data.len(),
3716                begin_recvd: false,
3717                end_recvd: false,
3718                end_sent: false,
3719            }));
3720
3721            let mut tasks = vec![];
3722
3723            // Channels used by the mock relays to notify each other
3724            // of stream transfer completion.
3725            let (tx1, rx1) = oneshot::channel();
3726            let (tx2, rx2) = oneshot::channel();
3727
3728            // The 9 RTT delays to insert before each of the 9 SENDMEs
3729            // the exit will end up sending.
3730            //
3731            // Note: the first delay is the init_rtt delay (measured during the conflux HS).
3732            let circ1_rtt_delays = [
3733                // Initially, circ1 has better RTT, so we will start on this leg.
3734                Some(Duration::from_millis(100)),
3735                // But then its RTT takes a turn for the worse,
3736                // triggering a switch after the first SENDME is processed
3737                // (this happens after sending 123 DATA cells).
3738                Some(Duration::from_millis(500)),
3739                Some(Duration::from_millis(700)),
3740                Some(Duration::from_millis(900)),
3741                Some(Duration::from_millis(1100)),
3742                Some(Duration::from_millis(1300)),
3743                Some(Duration::from_millis(1500)),
3744                Some(Duration::from_millis(1700)),
3745                Some(Duration::from_millis(1900)),
3746                Some(Duration::from_millis(2100)),
3747            ]
3748            .into_iter();
3749
3750            let circ2_rtt_delays = [
3751                Some(Duration::from_millis(200)),
3752                Some(Duration::from_millis(400)),
3753                Some(Duration::from_millis(600)),
3754                Some(Duration::from_millis(800)),
3755                Some(Duration::from_millis(1000)),
3756                Some(Duration::from_millis(1200)),
3757                Some(Duration::from_millis(1400)),
3758                Some(Duration::from_millis(1600)),
3759                Some(Duration::from_millis(1800)),
3760                Some(Duration::from_millis(2000)),
3761            ]
3762            .into_iter();
3763
3764            // Note: we can't have two advance_by() calls running
3765            // at the same time (it's a limitation of MockRuntime),
3766            // so we need to be careful to not cause concurrent delays
3767            // on the two circuits.
3768            let relays = [
3769                (
3770                    circ1,
3771                    tx1,
3772                    rx2,
3773                    vec![ExpectedSwitch {
3774                        // We start on this leg, and receive a BEGIN cell,
3775                        // followed by (4 * 31 - 1) = 123 DATA cells.
3776                        // Then it becomes blocked on CC, then finally the reactor
3777                        // realizes it has some SENDMEs to process, and
3778                        // then as a result of the new RTT measurement, we switch to circ1,
3779                        // and then finally we switch back here, and get another SWITCH
3780                        // as the 126th cell.
3781                        cells_so_far: 126,
3782                        // Leg 2 switches back to this leg after the 249th cell
3783                        // (just before sending the 250th one):
3784                        // seqno = 125 carried over from leg 1 (see the seqno of the
3785                        // SWITCH expected on leg 2 below), plus 1 SWITCH, plus
3786                        // 4 * 31 = 124 DATA cells after which the RTT of the first leg
3787                        // is deemed favorable again.
3788                        //
3789                        // 249 - 125 (last_seq_sent of leg 1) = 124
3790                        seqno: 124,
3791                    }],
3792                    circ1_rtt_delays,
3793                    true,
3794                ),
3795                (
3796                    circ2,
3797                    tx2,
3798                    rx1,
3799                    vec![ExpectedSwitch {
3800                        // The SWITCH is the first cell we received after the conflux HS
3801                        // on this leg.
3802                        cells_so_far: 1,
3803                        // See explanation on the ExpectedSwitch from circ1 above.
3804                        seqno: 125,
3805                    }],
3806                    circ2_rtt_delays,
3807                    false,
3808                ),
3809            ];
3810
3811            let relay_runtime = Arc::new(AsyncMutex::new(rt.clone()));
3812            for (mut circ, done_tx, done_rx, expect_switch, mut rtt_delays, is_sending_leg) in
3813                relays.into_iter()
3814            {
3815                let leg = circ.unique_id;
3816
3817                // Do the conflux handshake
3818                //
3819                // We do this outside of run_conflux_endpoint,
3820                // toa void running both handshakes at concurrently
3821                // (this gives more predictable RTT delays:
3822                // if both handshake tasks run at once, they race
3823                // to advance the mock runtime's clock)
3824                good_exit_handshake(
3825                    &relay_runtime,
3826                    rtt_delays.next().flatten(),
3827                    &mut circ.chan_rx,
3828                    &mut circ.circ_tx,
3829                )
3830                .await;
3831
3832                let relay = ConfluxTestEndpoint::Relay(ConfluxExitState {
3833                    runtime: Arc::clone(&relay_runtime),
3834                    tunnel: Arc::clone(&tunnel),
3835                    circ,
3836                    rtt_delays,
3837                    stream_state: Arc::clone(&stream_state),
3838                    expect_switch,
3839                    done_tx,
3840                    done_rx,
3841                    is_sending_leg,
3842                });
3843
3844                tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
3845            }
3846
3847            tasks.push(rt.spawn_join(
3848                "client task".to_string(),
3849                run_conflux_endpoint(ConfluxTestEndpoint::Client {
3850                    tunnel,
3851                    conflux_link_rx,
3852                    send_data: send_data.clone(),
3853                    recv_data: vec![],
3854                }),
3855            ));
3856            let _sinks = futures::future::join_all(tasks).await;
3857            let mut stream_state = stream_state.lock().unwrap();
3858            assert!(stream_state.begin_recvd);
3859
3860            stream_state.data_recvd.sort();
3861            send_data.sort();
3862            assert_eq!(stream_state.data_recvd, send_data);
3863        });
3864    }
3865
3866    #[traced_test]
3867    #[test]
3868    #[cfg(all(feature = "conflux", feature = "hs-service"))]
3869    fn conflux_incoming_stream() {
3870        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3871            use std::error::Error as _;
3872
3873            const EXPECTED_HOP: u8 = 1;
3874
3875            let TestTunnelCtx {
3876                tunnel,
3877                circs,
3878                conflux_link_rx,
3879            } = setup_good_conflux_tunnel(&rt).await;
3880
3881            let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3882
3883            let link = await_link_payload(&mut circ1.chan_rx).await;
3884            for circ in [&mut circ1, &mut circ2] {
3885                let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3886                circ.circ_tx
3887                    .send(rmsg_to_ccmsg(None, linked))
3888                    .await
3889                    .unwrap();
3890            }
3891
3892            let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3893            assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
3894
3895            // TODO(#2002): we don't currently support conflux for onion services
3896            let err = tunnel
3897                .allow_stream_requests(
3898                    &[tor_cell::relaycell::RelayCmd::BEGIN],
3899                    (tunnel.unique_id(), EXPECTED_HOP.into()).into(),
3900                    AllowAllStreamsFilter,
3901                )
3902                .await
3903                // IncomingStream doesn't impl Debug, so we need to map to a different type
3904                .map(|_| ())
3905                .unwrap_err();
3906
3907            let err_src = err.source().unwrap();
3908            assert!(err_src
3909                .to_string()
3910                .contains("Cannot allow stream requests on tunnel with 2 legs"));
3911        });
3912    }
3913
3914    // TODO: add a test where the client receives data on a multipath stream,
3915    // and ensure it handles ooo cells correctly
3916}