tor_proto/tunnel/
circuit.rs

1//! Multi-hop paths over the Tor network.
2//!
3//! Right now, we only implement "client circuits" -- also sometimes
4//! called "origin circuits".  A client circuit is one that is
5//! constructed by this Tor instance, and used in its own behalf to
6//! send data over the Tor network.
7//!
8//! Each circuit has multiple hops over the Tor network: each hop
9//! knows only the hop before and the hop after.  The client shares a
10//! separate set of keys with each hop.
11//!
12//! To build a circuit, first create a [crate::channel::Channel], then
13//! call its [crate::channel::Channel::new_circ] method.  This yields
14//! a [PendingClientCirc] object that won't become live until you call
15//! one of the methods
16//! (typically [`PendingClientCirc::create_firsthop`])
17//! that extends it to its first hop.  After you've
18//! done that, you can call [`ClientCirc::extend`] on the circuit to
19//! build it into a multi-hop circuit.  Finally, you can use
20//! [ClientCirc::begin_stream] to get a Stream object that can be used
21//! for anonymized data.
22//!
23//! # Implementation
24//!
25//! Each open circuit has a corresponding Reactor object that runs in
26//! an asynchronous task, and manages incoming cells from the
27//! circuit's upstream channel.  These cells are either RELAY cells or
28//! DESTROY cells.  DESTROY cells are handled immediately.
29//! RELAY cells are either for a particular stream, in which case they
30//! get forwarded to a RawCellStream object, or for no particular stream,
31//! in which case they are considered "meta" cells (like EXTENDED2)
32//! that should only get accepted if something is waiting for them.
33//!
34//! # Limitations
35//!
36//! This is client-only.
37
38pub(crate) mod celltypes;
39pub(crate) mod halfcirc;
40
41#[cfg(feature = "hs-common")]
42pub mod handshake;
43#[cfg(not(feature = "hs-common"))]
44pub(crate) mod handshake;
45
46pub(super) mod path;
47pub(crate) mod unique_id;
48
49use crate::channel::Channel;
50use crate::circuit::handshake::RelayCryptLayerProtocol;
51use crate::congestion::params::CongestionControlParams;
52use crate::crypto::cell::HopNum;
53use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
54use crate::memquota::{CircuitAccount, SpecificAccount as _};
55use crate::stream::queue::stream_queue;
56use crate::stream::xon_xoff::XonXoffReaderCtrl;
57use crate::stream::{
58    AnyCmdChecker, DataCmdChecker, DataStream, ResolveCmdChecker, ResolveStream, StreamParameters,
59    StreamRateLimit, StreamReceiver,
60};
61use crate::tunnel::circuit::celltypes::*;
62use crate::tunnel::reactor::CtrlCmd;
63use crate::tunnel::reactor::{
64    CircuitHandshake, CtrlMsg, Reactor, RECV_WINDOW_INIT, STREAM_READER_BUFFER,
65};
66use crate::tunnel::{StreamTarget, TargetHop};
67use crate::util::notify::NotifySender;
68use crate::util::skew::ClockSkew;
69use crate::{Error, ResolveError, Result};
70use cfg_if::cfg_if;
71use educe::Educe;
72use path::HopDetail;
73use postage::watch;
74use tor_cell::relaycell::{self, RelayCellFormat};
75use tor_cell::{
76    chancell::CircId,
77    relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal},
78};
79
80use tor_error::{bad_api_usage, internal, into_internal};
81use tor_linkspec::{CircTarget, LinkSpecType, OwnedChanTarget, RelayIdType};
82use tor_protover::named;
83
84pub use crate::crypto::binding::CircuitBinding;
85pub use crate::memquota::StreamAccount;
86pub use crate::tunnel::circuit::unique_id::UniqId;
87
88#[cfg(feature = "hs-service")]
89use {
90    crate::stream::{IncomingCmdChecker, IncomingStream},
91    crate::tunnel::reactor::StreamReqInfo,
92};
93
94use futures::channel::mpsc;
95use oneshot_fused_workaround as oneshot;
96
97use crate::congestion::sendme::StreamRecvWindow;
98use crate::DynTimeProvider;
99use futures::FutureExt as _;
100use std::collections::HashMap;
101use std::net::IpAddr;
102use std::sync::{Arc, Mutex};
103use tor_memquota::mq_queue::{self, ChannelSpec as _, MpscSpec};
104
105use crate::crypto::handshake::ntor::NtorPublicKey;
106
107pub use path::{Path, PathEntry};
108
109/// The size of the buffer for communication between `ClientCirc` and its reactor.
110pub const CIRCUIT_BUFFER_SIZE: usize = 128;
111
112#[cfg(feature = "send-control-msg")]
113use {crate::tunnel::msghandler::UserMsgHandler, crate::tunnel::reactor::MetaCellHandler};
114
115pub use crate::tunnel::reactor::syncview::ClientCircSyncView;
116#[cfg(feature = "send-control-msg")]
117#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
118pub use {crate::tunnel::msghandler::MsgHandler, crate::tunnel::reactor::MetaCellDisposition};
119
120/// MPSC queue relating to a stream (either inbound or outbound), sender
121pub(crate) type StreamMpscSender<T> = mq_queue::Sender<T, MpscSpec>;
122/// MPSC queue relating to a stream (either inbound or outbound), receiver
123pub(crate) type StreamMpscReceiver<T> = mq_queue::Receiver<T, MpscSpec>;
124
125/// MPSC queue for inbound data on its way from channel to circuit, sender
126pub(crate) type CircuitRxSender = mq_queue::Sender<ClientCircChanMsg, MpscSpec>;
127/// MPSC queue for inbound data on its way from channel to circuit, receiver
128pub(crate) type CircuitRxReceiver = mq_queue::Receiver<ClientCircChanMsg, MpscSpec>;
129
130#[derive(Debug)]
131/// A circuit that we have constructed over the Tor network.
132///
133/// # Circuit life cycle
134///
135/// `ClientCirc`s are created in an initially unusable state using [`Channel::new_circ`],
136/// which returns a [`PendingClientCirc`].  To get a real (one-hop) circuit from
137/// one of these, you invoke one of its `create_firsthop` methods (typically
138/// [`create_firsthop_fast()`](PendingClientCirc::create_firsthop_fast) or
139/// [`create_firsthop()`](PendingClientCirc::create_firsthop)).
140/// Then, to add more hops to the circuit, you can call
141/// [`extend()`](ClientCirc::extend) on it.
142///
143/// For higher-level APIs, see the `tor-circmgr` crate: the ones here in
144/// `tor-proto` are probably not what you need.
145///
146/// After a circuit is created, it will persist until it is closed in one of
147/// five ways:
148///    1. A remote error occurs.
149///    2. Some hop on the circuit sends a `DESTROY` message to tear down the
150///       circuit.
151///    3. The circuit's channel is closed.
152///    4. Someone calls [`ClientCirc::terminate`] on the circuit.
153///    5. The last reference to the `ClientCirc` is dropped. (Note that every stream
154///       on a `ClientCirc` keeps a reference to it, which will in turn keep the
155///       circuit from closing until all those streams have gone away.)
156///
157/// Note that in cases 1-4 the [`ClientCirc`] object itself will still exist: it
158/// will just be unusable for most purposes.  Most operations on it will fail
159/// with an error.
160//
161// Effectively, this struct contains two Arcs: one for `path` and one for
162// `control` (which surely has something Arc-like in it).  We cannot unify
163// these by putting a single Arc around the whole struct, and passing
164// an Arc strong reference to the `Reactor`, because then `control` would
165// not be dropped when the last user of the circuit goes away.  We could
166// make the reactor have a weak reference but weak references are more
167// expensive to dereference.
168//
169// Because of the above, cloning this struct is always going to involve
170// two atomic refcount changes/checks.  Wrapping it in another Arc would
171// be overkill.
172//
173pub struct ClientCirc {
174    /// Mutable state shared with the `Reactor`.
175    mutable: Arc<TunnelMutableState>,
176    /// A unique identifier for this circuit.
177    unique_id: UniqId,
178    /// Channel to send control messages to the reactor.
179    pub(super) control: mpsc::UnboundedSender<CtrlMsg>,
180    /// Channel to send commands to the reactor.
181    pub(super) command: mpsc::UnboundedSender<CtrlCmd>,
182    /// A future that resolves to Cancelled once the reactor is shut down,
183    /// meaning that the circuit is closed.
184    #[cfg_attr(not(feature = "experimental-api"), allow(dead_code))]
185    reactor_closed_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
186    /// For testing purposes: the CircId, for use in peek_circid().
187    #[cfg(test)]
188    circid: CircId,
189    /// Memory quota account
190    memquota: CircuitAccount,
191    /// Time provider
192    time_provider: DynTimeProvider,
193}
194
195/// The mutable state of a tunnel, shared between [`ClientCirc`] and [`Reactor`].
196///
197/// NOTE(gabi): this mutex-inside-a-mutex might look suspicious,
198/// but it is currently the best option we have for sharing
199/// the circuit state with `ClientCirc` (and soon, with `ClientTunnel`).
200/// In practice, these mutexes won't be accessed very often
201/// (they're accessed for writing when a circuit is extended,
202/// and for reading by the various `ClientCirc` APIs),
203/// so they shouldn't really impact performance.
204///
205/// Alternatively, the circuit state information could be shared
206/// outside the reactor through a channel (passed to the reactor via a `CtrlCmd`),
207/// but in #1840 @opara notes that involves making the `ClientCirc` accessors
208/// (`ClientCirc::path`, `ClientCirc::binding_key`, etc.)
209/// asynchronous, which will significantly complicate their callsites,
210/// which would in turn need to be made async too.
211///
212/// We should revisit this decision at some point, and decide whether an async API
213/// would be preferable.
214#[derive(Debug, Default)]
215pub(super) struct TunnelMutableState(Mutex<HashMap<UniqId, Arc<MutableState>>>);
216
217impl TunnelMutableState {
218    /// Add the [`MutableState`] of a circuit.
219    pub(super) fn insert(&self, unique_id: UniqId, mutable: Arc<MutableState>) {
220        #[allow(unused)] // unused in non-debug builds
221        let state = self
222            .0
223            .lock()
224            .expect("lock poisoned")
225            .insert(unique_id, mutable);
226
227        debug_assert!(state.is_none());
228    }
229
230    /// Remove the [`MutableState`] of a circuit.
231    pub(super) fn remove(&self, unique_id: UniqId) {
232        #[allow(unused)] // unused in non-debug builds
233        let state = self.0.lock().expect("lock poisoned").remove(&unique_id);
234
235        debug_assert!(state.is_some());
236    }
237
238    /// Return a [`Path`] object describing all the hops in the specified circuit.
239    ///
240    /// See [`MutableState::path`].
241    fn path_ref(&self, unique_id: UniqId) -> Result<Arc<Path>> {
242        let lock = self.0.lock().expect("lock poisoned");
243        let mutable = lock
244            .get(&unique_id)
245            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
246
247        Ok(mutable.path())
248    }
249
250    /// Return a description of the first hop of this circuit.
251    ///
252    /// Returns an error if a circuit with the specified [`UniqId`] doesn't exist.
253    /// Returns `Ok(None)` if the specified circuit doesn't have any hops.
254    fn first_hop(&self, unique_id: UniqId) -> Result<Option<OwnedChanTarget>> {
255        let lock = self.0.lock().expect("lock poisoned");
256        let mutable = lock
257            .get(&unique_id)
258            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
259
260        let first_hop = mutable.first_hop().map(|first_hop| match first_hop {
261            path::HopDetail::Relay(r) => r,
262            #[cfg(feature = "hs-common")]
263            path::HopDetail::Virtual => {
264                panic!("somehow made a circuit with a virtual first hop.")
265            }
266        });
267
268        Ok(first_hop)
269    }
270
271    /// Return the [`HopNum`] of the last hop of the specified circuit.
272    ///
273    /// Returns an error if a circuit with the specified [`UniqId`] doesn't exist.
274    ///
275    /// See [`MutableState::last_hop_num`].
276    fn last_hop_num(&self, unique_id: UniqId) -> Result<Option<HopNum>> {
277        let lock = self.0.lock().expect("lock poisoned");
278        let mutable = lock
279            .get(&unique_id)
280            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
281
282        Ok(mutable.last_hop_num())
283    }
284
285    /// Return the number of hops in the specified circuit.
286    ///
287    /// See [`MutableState::n_hops`].
288    fn n_hops(&self, unique_id: UniqId) -> Result<usize> {
289        let lock = self.0.lock().expect("lock poisoned");
290        let mutable = lock
291            .get(&unique_id)
292            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
293
294        Ok(mutable.n_hops())
295    }
296
297    /// Return the number of legs in this tunnel.
298    ///
299    /// TODO(conflux-fork): this can be removed once we modify `path_ref`
300    /// to return *all* the Paths in the tunnel.
301    fn n_legs(&self) -> usize {
302        let lock = self.0.lock().expect("lock poisoned");
303        lock.len()
304    }
305}
306
307/// The mutable state of a circuit.
308#[derive(Educe, Default)]
309#[educe(Debug)]
310pub(super) struct MutableState(Mutex<CircuitState>);
311
312impl MutableState {
313    /// Add a hop to the path of this circuit.
314    pub(super) fn add_hop(&self, peer_id: HopDetail, binding: Option<CircuitBinding>) {
315        let mut mutable = self.0.lock().expect("poisoned lock");
316        Arc::make_mut(&mut mutable.path).push_hop(peer_id);
317        mutable.binding.push(binding);
318    }
319
320    /// Get a copy of the circuit's current [`path::Path`].
321    pub(super) fn path(&self) -> Arc<path::Path> {
322        let mutable = self.0.lock().expect("poisoned lock");
323        Arc::clone(&mutable.path)
324    }
325
326    /// Return the cryptographic material used to prove knowledge of a shared
327    /// secret with with `hop`.
328    pub(super) fn binding_key(&self, hop: HopNum) -> Option<CircuitBinding> {
329        let mutable = self.0.lock().expect("poisoned lock");
330
331        mutable.binding.get::<usize>(hop.into()).cloned().flatten()
332        // NOTE: I'm not thrilled to have to copy this information, but we use
333        // it very rarely, so it's not _that_ bad IMO.
334    }
335
336    /// Return a description of the first hop of this circuit.
337    fn first_hop(&self) -> Option<HopDetail> {
338        let mutable = self.0.lock().expect("poisoned lock");
339        mutable.path.first_hop()
340    }
341
342    /// Return the [`HopNum`] of the last hop of this circuit.
343    ///
344    /// NOTE: This function will return the [`HopNum`] of the hop
345    /// that is _currently_ the last. If there is an extend operation in progress,
346    /// the currently pending hop may or may not be counted, depending on whether
347    /// the extend operation finishes before this call is done.
348    fn last_hop_num(&self) -> Option<HopNum> {
349        let mutable = self.0.lock().expect("poisoned lock");
350        mutable.path.last_hop_num()
351    }
352
353    /// Return the number of hops in this circuit.
354    ///
355    /// NOTE: This function will currently return only the number of hops
356    /// _currently_ in the circuit. If there is an extend operation in progress,
357    /// the currently pending hop may or may not be counted, depending on whether
358    /// the extend operation finishes before this call is done.
359    fn n_hops(&self) -> usize {
360        let mutable = self.0.lock().expect("poisoned lock");
361        mutable.path.n_hops()
362    }
363}
364
365/// The shared state of a circuit.
366#[derive(Educe, Default)]
367#[educe(Debug)]
368pub(super) struct CircuitState {
369    /// Information about this circuit's path.
370    ///
371    /// This is stored in an Arc so that we can cheaply give a copy of it to
372    /// client code; when we need to add a hop (which is less frequent) we use
373    /// [`Arc::make_mut()`].
374    path: Arc<path::Path>,
375
376    /// Circuit binding keys [q.v.][`CircuitBinding`] information for each hop
377    /// in the circuit's path.
378    ///
379    /// NOTE: Right now, there is a `CircuitBinding` for every hop.  There's a
380    /// fair chance that this will change in the future, and I don't want other
381    /// code to assume that a `CircuitBinding` _must_ exist, so I'm making this
382    /// an `Option`.
383    #[educe(Debug(ignore))]
384    binding: Vec<Option<CircuitBinding>>,
385}
386
387/// A ClientCirc that needs to send a create cell and receive a created* cell.
388///
389/// To use one of these, call `create_firsthop_fast()` or `create_firsthop()`
390/// to negotiate the cryptographic handshake with the first hop.
391pub struct PendingClientCirc {
392    /// A oneshot receiver on which we'll receive a CREATED* cell,
393    /// or a DESTROY cell.
394    recvcreated: oneshot::Receiver<CreateResponse>,
395    /// The ClientCirc object that we can expose on success.
396    circ: Arc<ClientCirc>,
397}
398
399/// Description of the network's current rules for building circuits.
400///
401/// This type describes rules derived from the consensus,
402/// and possibly amended by our own configuration.
403///
404/// Typically, this type created once for an entire circuit,
405/// and any special per-hop information is derived
406/// from each hop as a CircTarget.
407/// Note however that callers _may_ provide different `CircParameters`
408/// for different hops within a circuit if they have some reason to do so,
409/// so we do not enforce that every hop in a circuit has the same `CircParameters`.
410#[non_exhaustive]
411#[derive(Clone, Debug)]
412pub struct CircParameters {
413    /// Whether we should include ed25519 identities when we send
414    /// EXTEND2 cells.
415    pub extend_by_ed25519_id: bool,
416    /// Congestion control parameters for this circuit.
417    pub ccontrol: CongestionControlParams,
418
419    /// Maximum number of permitted incoming relay cells for each hop.
420    ///
421    /// If we would receive more relay cells than this from a single hop,
422    /// we close the circuit with [`ExcessInboundCells`](Error::ExcessInboundCells).
423    ///
424    /// If this value is None, then there is no limit to the number of inbound cells.
425    ///
426    /// Known limitation: If this value if `u32::MAX`,
427    /// then a limit of `u32::MAX - 1` is enforced.
428    pub n_incoming_cells_permitted: Option<u32>,
429
430    /// Maximum number of permitted outgoing relay cells for each hop.
431    ///
432    /// If we would try to send more relay cells than this from a single hop,
433    /// we close the circuit with [`ExcessOutboundCells`](Error::ExcessOutboundCells).
434    /// It is the circuit-user's responsibility to make sure that this does not happen.
435    ///
436    /// This setting is used to ensure that we do not violate a limit
437    /// imposed by `n_incoming_cells_permitted`
438    /// on the other side of a circuit.
439    ///
440    /// If this value is None, then there is no limit to the number of outbound cells.
441    ///
442    /// Known limitation: If this value if `u32::MAX`,
443    /// then a limit of `u32::MAX - 1` is enforced.
444    pub n_outgoing_cells_permitted: Option<u32>,
445}
446
447/// Type of negotiation that we'll be performing as we establish a hop.
448///
449/// Determines what flavor of extensions we can send and receive, which in turn
450/// limits the hop settings we can negotiate.
451///
452// TODO-CGO: This is likely to be refactored when we finally add support for
453// HsV3+CGO, which will require refactoring
454#[derive(Debug, Clone, Copy, Eq, PartialEq)]
455pub(super) enum HopNegotiationType {
456    /// We're using a handshake in which extension-based negotiation cannot occur.
457    None,
458    /// We're using the HsV3-ntor handshake, in which the client can send extensions,
459    /// but the server cannot.
460    ///
461    /// As a special case, the default relay encryption protocol is the hsv3
462    /// variant of Tor1.
463    //
464    // We would call this "HalfDuplex" or something, but we do not expect to add
465    // any more handshakes of this type.
466    HsV3,
467    /// We're using a handshake in which both client and relay can send extensions.
468    Full,
469}
470
471/// The settings we use for single hop of a circuit.
472///
473/// Unlike [`CircParameters`], this type is crate-internal.
474/// We construct it based on our settings from the circuit,
475/// and from the hop's actual capabilities.
476/// Then, we negotiate with the hop as part of circuit
477/// creation/extension to determine the actual settings that will be in use.
478/// Finally, we use those settings to construct the negotiated circuit hop.
479//
480// TODO: Relays should probably derive an instance of this type too, as
481// part of the circuit creation handshake.
482#[derive(Clone, Debug)]
483pub(super) struct HopSettings {
484    /// The negotiated congestion control settings for this hop .
485    pub(super) ccontrol: CongestionControlParams,
486
487    /// Maximum number of permitted incoming relay cells for this hop.
488    pub(super) n_incoming_cells_permitted: Option<u32>,
489
490    /// Maximum number of permitted outgoing relay cells for this hop.
491    pub(super) n_outgoing_cells_permitted: Option<u32>,
492
493    /// The relay cell encryption algorithm and cell format for this hop.
494    relay_crypt_protocol: RelayCryptLayerProtocol,
495}
496
497impl HopSettings {
498    /// Construct a new `HopSettings` based on `params` (a set of circuit parameters)
499    /// and `caps` (a set of protocol capabilities for a circuit target).
500    ///
501    /// The resulting settings will represent what the client would prefer to negotiate
502    /// (determined by `params`),
503    /// as modified by what the target relay is believed to support (represented by `caps`).
504    ///
505    /// This represents the `HopSettings` in a pre-negotiation state:
506    /// the circuit negotiation process will modify it.
507    #[allow(clippy::unnecessary_wraps)] // likely to become fallible in the future.
508    pub(super) fn from_params_and_caps(
509        hoptype: HopNegotiationType,
510        params: &CircParameters,
511        caps: &tor_protover::Protocols,
512    ) -> Result<Self> {
513        let mut ccontrol = params.ccontrol.clone();
514        match ccontrol.alg() {
515            crate::ccparams::Algorithm::FixedWindow(_) => {}
516            crate::ccparams::Algorithm::Vegas(_) => {
517                // If the target doesn't support FLOWCTRL_CC, we can't use Vegas.
518                if !caps.supports_named_subver(named::FLOWCTRL_CC) {
519                    ccontrol.use_fallback_alg();
520                }
521            }
522        };
523        if hoptype == HopNegotiationType::None {
524            ccontrol.use_fallback_alg();
525        } else if hoptype == HopNegotiationType::HsV3 {
526            // TODO #2037, TODO-CGO: We need a way to send congestion control extensions
527            // in this case too.  But since we aren't sending them, we
528            // should use the fallback algorithm.
529            ccontrol.use_fallback_alg();
530        }
531        let ccontrol = ccontrol; // drop mut
532
533        // Negotiate CGO if it is supported, if CC is also supported,
534        // and if CGO is available on this relay.
535        let relay_crypt_protocol = match hoptype {
536            HopNegotiationType::None => RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0),
537            HopNegotiationType::HsV3 => {
538                // TODO-CGO: Support CGO when available.
539                cfg_if! {
540                    if #[cfg(feature = "hs-common")] {
541                        RelayCryptLayerProtocol::HsV3(RelayCellFormat::V0)
542                    } else {
543                        return Err(
544                            internal!("Unexpectedly tried to negotiate HsV3 without support!").into(),
545                        );
546                    }
547                }
548            }
549            HopNegotiationType::Full => {
550                cfg_if! {
551                    if #[cfg(all(feature = "flowctl-cc", feature = "counter-galois-onion"))] {
552                        #[allow(clippy::overly_complex_bool_expr)]
553                        if  ccontrol.alg().compatible_with_cgo()
554                            && caps.supports_named_subver(named::RELAY_NEGOTIATE_SUBPROTO)
555                            && caps.supports_named_subver(named::RELAY_CRYPT_CGO)
556                            && false // TODO CGO REMOVE once we are ready to enable CGO.
557                            // (We aren't enabling it yet because CC is not yet negotiable.)
558                        {
559                            RelayCryptLayerProtocol::Cgo
560                        } else {
561                            RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0)
562                        }
563                    } else {
564                        RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0)
565                    }
566                }
567            }
568        };
569
570        Ok(Self {
571            ccontrol,
572            relay_crypt_protocol,
573            n_incoming_cells_permitted: params.n_incoming_cells_permitted,
574            n_outgoing_cells_permitted: params.n_outgoing_cells_permitted,
575        })
576    }
577
578    /// Return the negotiated relay crypto protocol.
579    pub(super) fn relay_crypt_protocol(&self) -> RelayCryptLayerProtocol {
580        // TODO CGO: Remove this once we are ready to enable CGO.
581        // (We aren't enabling it yet because CC is not yet negotiable.)
582        #[cfg(feature = "counter-galois-onion")]
583        assert!(
584            !matches!(self.relay_crypt_protocol, RelayCryptLayerProtocol::Cgo),
585            "Somehow negotiated CGO, but CGO is not yet supported!!"
586        );
587        self.relay_crypt_protocol
588    }
589}
590
591#[cfg(test)]
592impl std::default::Default for CircParameters {
593    fn default() -> Self {
594        Self {
595            extend_by_ed25519_id: true,
596            ccontrol: crate::congestion::test_utils::params::build_cc_fixed_params(),
597            n_incoming_cells_permitted: None,
598            n_outgoing_cells_permitted: None,
599        }
600    }
601}
602
603impl CircParameters {
604    /// Constructor
605    pub fn new(extend_by_ed25519_id: bool, ccontrol: CongestionControlParams) -> Self {
606        Self {
607            extend_by_ed25519_id,
608            ccontrol,
609            n_incoming_cells_permitted: None,
610            n_outgoing_cells_permitted: None,
611        }
612    }
613}
614
615impl ClientCirc {
616    /// Return a description of the first hop of this circuit.
617    ///
618    /// # Panics
619    ///
620    /// Panics if there is no first hop.  (This should be impossible outside of
621    /// the tor-proto crate, but within the crate it's possible to have a
622    /// circuit with no hops.)
623    pub fn first_hop(&self) -> Result<OwnedChanTarget> {
624        Ok(self
625            .mutable
626            .first_hop(self.unique_id)
627            .map_err(|_| Error::CircuitClosed)?
628            .expect("called first_hop on an un-constructed circuit"))
629    }
630
631    /// Return a description of the last hop of the circuit.
632    ///
633    /// Return None if the last hop is virtual.
634    ///
635    /// See caveats on [`ClientCirc::last_hop_num()`].
636    ///
637    /// # Panics
638    ///
639    /// Panics if there is no last hop.  (This should be impossible outside of
640    /// the tor-proto crate, but within the crate it's possible to have a
641    /// circuit with no hops.)
642    pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
643        let path = self.path_ref()?;
644        Ok(path
645            .hops()
646            .last()
647            .expect("Called last_hop an an un-constructed circuit")
648            .as_chan_target()
649            .map(OwnedChanTarget::from_chan_target))
650    }
651
652    /// Return the [`HopNum`] of the last hop of this circuit.
653    ///
654    /// Returns an error if there is no last hop.  (This should be impossible outside of the
655    /// tor-proto crate, but within the crate it's possible to have a circuit with no hops.)
656    ///
657    /// NOTE: This function will return the [`HopNum`] of the hop
658    /// that is _currently_ the last. If there is an extend operation in progress,
659    /// the currently pending hop may or may not be counted, depending on whether
660    /// the extend operation finishes before this call is done.
661    pub fn last_hop_num(&self) -> Result<HopNum> {
662        Ok(self
663            .mutable
664            .last_hop_num(self.unique_id)?
665            .ok_or_else(|| internal!("no last hop index"))?)
666    }
667
668    /// Return a [`TargetHop`] representing precisely the last hop of the circuit as in set as a
669    /// HopLocation with its id and hop number.
670    ///
671    /// Return an error if there is no last hop.
672    pub fn last_hop(&self) -> Result<TargetHop> {
673        let hop_num = self
674            .mutable
675            .last_hop_num(self.unique_id)?
676            .ok_or_else(|| bad_api_usage!("no last hop"))?;
677        Ok((self.unique_id, hop_num).into())
678    }
679
680    /// Return a [`Path`] object describing all the hops in this circuit.
681    ///
682    /// Note that this `Path` is not automatically updated if the circuit is
683    /// extended.
684    pub fn path_ref(&self) -> Result<Arc<Path>> {
685        self.mutable
686            .path_ref(self.unique_id)
687            .map_err(|_| Error::CircuitClosed)
688    }
689
690    /// Get the clock skew claimed by the first hop of the circuit.
691    ///
692    /// See [`Channel::clock_skew()`].
693    pub async fn first_hop_clock_skew(&self) -> Result<ClockSkew> {
694        let (tx, rx) = oneshot::channel();
695
696        self.control
697            .unbounded_send(CtrlMsg::FirstHopClockSkew { answer: tx })
698            .map_err(|_| Error::CircuitClosed)?;
699
700        Ok(rx.await.map_err(|_| Error::CircuitClosed)??)
701    }
702
703    /// Return a reference to this circuit's memory quota account
704    pub fn mq_account(&self) -> &CircuitAccount {
705        &self.memquota
706    }
707
708    /// Return the cryptographic material used to prove knowledge of a shared
709    /// secret with with `hop`.
710    ///
711    /// See [`CircuitBinding`] for more information on how this is used.
712    ///
713    /// Return None if we have no circuit binding information for the hop, or if
714    /// the hop does not exist.
715    #[cfg(feature = "hs-service")]
716    pub async fn binding_key(&self, hop: TargetHop) -> Result<Option<CircuitBinding>> {
717        let (sender, receiver) = oneshot::channel();
718        let msg = CtrlCmd::GetBindingKey { hop, done: sender };
719        self.command
720            .unbounded_send(msg)
721            .map_err(|_| Error::CircuitClosed)?;
722
723        receiver.await.map_err(|_| Error::CircuitClosed)?
724    }
725
726    /// Start an ad-hoc protocol exchange to the specified hop on this circuit
727    ///
728    /// To use this:
729    ///
730    ///  0. Create an inter-task channel you'll use to receive
731    ///     the outcome of your conversation,
732    ///     and bundle it into a [`MsgHandler`].
733    ///
734    ///  1. Call `start_conversation`.
735    ///     This will install a your handler, for incoming messages,
736    ///     and send the outgoing message (if you provided one).
737    ///     After that, each message on the circuit
738    ///     that isn't handled by the core machinery
739    ///     is passed to your provided `reply_handler`.
740    ///
741    ///  2. Possibly call `send_msg` on the [`Conversation`],
742    ///     from the call site of `start_conversation`,
743    ///     possibly multiple times, from time to time,
744    ///     to send further desired messages to the peer.
745    ///
746    ///  3. In your [`MsgHandler`], process the incoming messages.
747    ///     You may respond by
748    ///     sending additional messages
749    ///     When the protocol exchange is finished,
750    ///     `MsgHandler::handle_msg` should return
751    ///     [`ConversationFinished`](MetaCellDisposition::ConversationFinished).
752    ///
753    /// If you don't need the `Conversation` to send followup messages,
754    /// you may simply drop it,
755    /// and rely on the responses you get from your handler,
756    /// on the channel from step 0 above.
757    /// Your handler will remain installed and able to process incoming messages
758    /// until it returns `ConversationFinished`.
759    ///
760    /// (If you don't want to accept any replies at all, it may be
761    /// simpler to use [`ClientCirc::send_raw_msg`].)
762    ///
763    /// Note that it is quite possible to use this function to violate the tor
764    /// protocol; most users of this API will not need to call it.  It is used
765    /// to implement most of the onion service handshake.
766    ///
767    /// # Limitations
768    ///
769    /// Only one conversation may be active at any one time,
770    /// for any one circuit.
771    /// This generally means that this function should not be called
772    /// on a circuit which might be shared with anyone else.
773    ///
774    /// Likewise, it is forbidden to try to extend the circuit,
775    /// while the conversation is in progress.
776    ///
777    /// After the conversation has finished, the circuit may be extended.
778    /// Or, `start_conversation` may be called again;
779    /// but, in that case there will be a gap between the two conversations,
780    /// during which no `MsgHandler` is installed,
781    /// and unexpected incoming messages would close the circuit.
782    ///
783    /// If these restrictions are violated, the circuit will be closed with an error.
784    ///
785    /// ## Precise definition of the lifetime of a conversation
786    ///
787    /// A conversation is in progress from entry to `start_conversation`,
788    /// until entry to the body of the [`MsgHandler::handle_msg`]
789    /// call which returns [`ConversationFinished`](MetaCellDisposition::ConversationFinished).
790    /// (*Entry* since `handle_msg` is synchronously embedded
791    /// into the incoming message processing.)
792    /// So you may start a new conversation as soon as you have the final response
793    /// via your inter-task channel from (0) above.
794    ///
795    /// The lifetime relationship of the [`Conversation`],
796    /// vs the handler returning `ConversationFinished`
797    /// is not enforced by the type system.
798    // Doing so without still leaving plenty of scope for runtime errors doesn't seem possible,
799    // at least while allowing sending followup messages from outside the handler.
800    //
801    // TODO hs: it might be nice to avoid exposing tor-cell APIs in the
802    //   tor-proto interface.
803    #[cfg(feature = "send-control-msg")]
804    pub async fn start_conversation(
805        &self,
806        msg: Option<relaycell::msg::AnyRelayMsg>,
807        reply_handler: impl MsgHandler + Send + 'static,
808        hop: TargetHop,
809    ) -> Result<Conversation<'_>> {
810        // We need to resolve the TargetHop into a precise HopLocation so our msg handler can match
811        // the right Leg/Hop with inbound cell.
812        let (sender, receiver) = oneshot::channel();
813        self.command
814            .unbounded_send(CtrlCmd::ResolveTargetHop { hop, done: sender })
815            .map_err(|_| Error::CircuitClosed)?;
816        let hop_location = receiver.await.map_err(|_| Error::CircuitClosed)??;
817        let handler = Box::new(UserMsgHandler::new(hop_location, reply_handler));
818        let conversation = Conversation(self);
819        conversation.send_internal(msg, Some(handler)).await?;
820        Ok(conversation)
821    }
822
823    /// Send an ad-hoc message to a given hop on the circuit, without expecting
824    /// a reply.
825    ///
826    /// (If you want to handle one or more possible replies, see
827    /// [`ClientCirc::start_conversation`].)
828    #[cfg(feature = "send-control-msg")]
829    pub async fn send_raw_msg(
830        &self,
831        msg: relaycell::msg::AnyRelayMsg,
832        hop: TargetHop,
833    ) -> Result<()> {
834        let (sender, receiver) = oneshot::channel();
835        let ctrl_msg = CtrlMsg::SendMsg { hop, msg, sender };
836        self.control
837            .unbounded_send(ctrl_msg)
838            .map_err(|_| Error::CircuitClosed)?;
839
840        receiver.await.map_err(|_| Error::CircuitClosed)?
841    }
842
843    /// Tell this circuit to begin allowing the final hop of the circuit to try
844    /// to create new Tor streams, and to return those pending requests in an
845    /// asynchronous stream.
846    ///
847    /// Ordinarily, these requests are rejected.
848    ///
849    /// There can only be one [`Stream`](futures::Stream) of this type created on a given circuit.
850    /// If a such a [`Stream`](futures::Stream) already exists, this method will return
851    /// an error.
852    ///
853    /// After this method has been called on a circuit, the circuit is expected
854    /// to receive requests of this type indefinitely, until it is finally closed.
855    /// If the `Stream` is dropped, the next request on this circuit will cause it to close.
856    ///
857    /// Only onion services (and eventually) exit relays should call this
858    /// method.
859    //
860    // TODO: Someday, we might want to allow a stream request handler to be
861    // un-registered.  However, nothing in the Tor protocol requires it.
862    #[cfg(feature = "hs-service")]
863    pub async fn allow_stream_requests(
864        self: &Arc<ClientCirc>,
865        allow_commands: &[relaycell::RelayCmd],
866        hop: TargetHop,
867        filter: impl crate::stream::IncomingStreamRequestFilter,
868    ) -> Result<impl futures::Stream<Item = IncomingStream>> {
869        use futures::stream::StreamExt;
870
871        /// The size of the channel receiving IncomingStreamRequestContexts.
872        const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
873
874        // TODO(#2002): support onion service conflux
875        let circ_count = self.mutable.n_legs();
876        if circ_count != 1 {
877            return Err(
878                internal!("Cannot allow stream requests on tunnel with {circ_count} legs",).into(),
879            );
880        }
881
882        let time_prov = self.time_provider.clone();
883        let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
884        let (incoming_sender, incoming_receiver) =
885            MpscSpec::new(INCOMING_BUFFER).new_mq(time_prov, self.memquota.as_raw_account())?;
886        let (tx, rx) = oneshot::channel();
887
888        self.command
889            .unbounded_send(CtrlCmd::AwaitStreamRequest {
890                cmd_checker,
891                incoming_sender,
892                hop,
893                done: tx,
894                filter: Box::new(filter),
895            })
896            .map_err(|_| Error::CircuitClosed)?;
897
898        // Check whether the AwaitStreamRequest was processed successfully.
899        rx.await.map_err(|_| Error::CircuitClosed)??;
900
901        let allowed_hop_loc = match hop {
902            TargetHop::Hop(loc) => Some(loc),
903            _ => None,
904        }
905        .ok_or_else(|| bad_api_usage!("Expected TargetHop with HopLocation"))?;
906
907        let circ = Arc::clone(self);
908        Ok(incoming_receiver.map(move |req_ctx| {
909            let StreamReqInfo {
910                req,
911                stream_id,
912                hop,
913                receiver,
914                msg_tx,
915                rate_limit_stream,
916                drain_rate_request_stream,
917                memquota,
918                relay_cell_format,
919            } = req_ctx;
920
921            // We already enforce this in handle_incoming_stream_request; this
922            // assertion is just here to make sure that we don't ever
923            // accidentally remove or fail to enforce that check, since it is
924            // security-critical.
925            assert_eq!(allowed_hop_loc, hop);
926
927            // TODO(#2002): figure out what this is going to look like
928            // for onion services (perhaps we should forbid this function
929            // from being called on a multipath circuit?)
930            //
931            // See also:
932            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3002#note_3200937
933            let target = StreamTarget {
934                circ: Arc::clone(&circ),
935                tx: msg_tx,
936                hop: allowed_hop_loc,
937                stream_id,
938                relay_cell_format,
939                rate_limit_stream,
940            };
941
942            // can be used to build a reader that supports XON/XOFF flow control
943            let xon_xoff_reader_ctrl =
944                XonXoffReaderCtrl::new(drain_rate_request_stream, target.clone());
945
946            let reader = StreamReceiver {
947                target: target.clone(),
948                receiver,
949                recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
950                ended: false,
951            };
952
953            let components = StreamComponents {
954                stream_receiver: reader,
955                target,
956                memquota,
957                xon_xoff_reader_ctrl,
958            };
959
960            IncomingStream::new(circ.time_provider.clone(), req, components)
961        }))
962    }
963
964    /// Extend the circuit, via the most appropriate circuit extension handshake,
965    /// to the chosen `target` hop.
966    pub async fn extend<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
967    where
968        Tg: CircTarget,
969    {
970        // For now we use the simplest decision-making mechanism:
971        // we use ntor_v3 whenever it is present; and otherwise we use ntor.
972        //
973        // This behavior is slightly different from C tor, which uses ntor v3
974        // only whenever it want to send any extension in the circuit message.
975        // But thanks to congestion control (named::FLOWCTRL_CC), we'll _always_
976        // want to use an extension if we can, and so it doesn't make too much
977        // sense to detect the case where we have no extensions.
978        //
979        // (As of April 2025, RELAY_NTORV3 is not yet listed as Required for relays
980        // on the tor network, and so we cannot simply assume that everybody has it.)
981        if target
982            .protovers()
983            .supports_named_subver(named::RELAY_NTORV3)
984        {
985            self.extend_ntor_v3(target, params).await
986        } else {
987            self.extend_ntor(target, params).await
988        }
989    }
990
991    /// Extend the circuit via the ntor handshake to a new target last
992    /// hop.
993    pub async fn extend_ntor<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
994    where
995        Tg: CircTarget,
996    {
997        let key = NtorPublicKey {
998            id: *target
999                .rsa_identity()
1000                .ok_or(Error::MissingId(RelayIdType::Rsa))?,
1001            pk: *target.ntor_onion_key(),
1002        };
1003        let mut linkspecs = target
1004            .linkspecs()
1005            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
1006        if !params.extend_by_ed25519_id {
1007            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
1008        }
1009
1010        let (tx, rx) = oneshot::channel();
1011
1012        let peer_id = OwnedChanTarget::from_chan_target(target);
1013        let settings = HopSettings::from_params_and_caps(
1014            HopNegotiationType::None,
1015            &params,
1016            target.protovers(),
1017        )?;
1018        self.control
1019            .unbounded_send(CtrlMsg::ExtendNtor {
1020                peer_id,
1021                public_key: key,
1022                linkspecs,
1023                settings,
1024                done: tx,
1025            })
1026            .map_err(|_| Error::CircuitClosed)?;
1027
1028        rx.await.map_err(|_| Error::CircuitClosed)??;
1029
1030        Ok(())
1031    }
1032
1033    /// Extend the circuit via the ntor handshake to a new target last
1034    /// hop.
1035    pub async fn extend_ntor_v3<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
1036    where
1037        Tg: CircTarget,
1038    {
1039        let key = NtorV3PublicKey {
1040            id: *target
1041                .ed_identity()
1042                .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
1043            pk: *target.ntor_onion_key(),
1044        };
1045        let mut linkspecs = target
1046            .linkspecs()
1047            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
1048        if !params.extend_by_ed25519_id {
1049            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
1050        }
1051
1052        let (tx, rx) = oneshot::channel();
1053
1054        let peer_id = OwnedChanTarget::from_chan_target(target);
1055        let settings = HopSettings::from_params_and_caps(
1056            HopNegotiationType::Full,
1057            &params,
1058            target.protovers(),
1059        )?;
1060        self.control
1061            .unbounded_send(CtrlMsg::ExtendNtorV3 {
1062                peer_id,
1063                public_key: key,
1064                linkspecs,
1065                settings,
1066                done: tx,
1067            })
1068            .map_err(|_| Error::CircuitClosed)?;
1069
1070        rx.await.map_err(|_| Error::CircuitClosed)??;
1071
1072        Ok(())
1073    }
1074
1075    /// Extend this circuit by a single, "virtual" hop.
1076    ///
1077    /// A virtual hop is one for which we do not add an actual network connection
1078    /// between separate hosts (such as Relays).  We only add a layer of
1079    /// cryptography.
1080    ///
1081    /// This is used to implement onion services: the client and the service
1082    /// both build a circuit to a single rendezvous point, and tell the
1083    /// rendezvous point to relay traffic between their two circuits.  Having
1084    /// completed a [`handshake`] out of band[^1], the parties each extend their
1085    /// circuits by a single "virtual" encryption hop that represents their
1086    /// shared cryptographic context.
1087    ///
1088    /// Once a circuit has been extended in this way, it is an error to try to
1089    /// extend it in any other way.
1090    ///
1091    /// [^1]: Technically, the handshake is only _mostly_ out of band: the
1092    ///     client sends their half of the handshake in an ` message, and the
1093    ///     service's response is inline in its `RENDEZVOUS2` message.
1094    //
1095    // TODO hs: let's try to enforce the "you can't extend a circuit again once
1096    // it has been extended this way" property.  We could do that with internal
1097    // state, or some kind of a type state pattern.
1098    #[cfg(feature = "hs-common")]
1099    pub async fn extend_virtual(
1100        &self,
1101        protocol: handshake::RelayProtocol,
1102        role: handshake::HandshakeRole,
1103        seed: impl handshake::KeyGenerator,
1104        params: &CircParameters,
1105        capabilities: &tor_protover::Protocols,
1106    ) -> Result<()> {
1107        use self::handshake::BoxedClientLayer;
1108
1109        // TODO CGO: Possibly refactor this match into a separate method when we revisit this.
1110        let negotiation_type = match protocol {
1111            handshake::RelayProtocol::HsV3 => HopNegotiationType::HsV3,
1112        };
1113        let protocol = handshake::RelayCryptLayerProtocol::from(protocol);
1114
1115        let BoxedClientLayer { fwd, back, binding } =
1116            protocol.construct_client_layers(role, seed)?;
1117
1118        let settings = HopSettings::from_params_and_caps(negotiation_type, params, capabilities)?;
1119        let (tx, rx) = oneshot::channel();
1120        let message = CtrlCmd::ExtendVirtual {
1121            cell_crypto: (fwd, back, binding),
1122            settings,
1123            done: tx,
1124        };
1125
1126        self.command
1127            .unbounded_send(message)
1128            .map_err(|_| Error::CircuitClosed)?;
1129
1130        rx.await.map_err(|_| Error::CircuitClosed)?
1131    }
1132
1133    /// Helper, used to begin a stream.
1134    ///
1135    /// This function allocates a stream ID, and sends the message
1136    /// (like a BEGIN or RESOLVE), but doesn't wait for a response.
1137    ///
1138    /// The caller will typically want to see the first cell in response,
1139    /// to see whether it is e.g. an END or a CONNECTED.
1140    async fn begin_stream_impl(
1141        self: &Arc<ClientCirc>,
1142        begin_msg: AnyRelayMsg,
1143        cmd_checker: AnyCmdChecker,
1144    ) -> Result<StreamComponents> {
1145        // TODO: Possibly this should take a hop, rather than just
1146        // assuming it's the last hop.
1147        let hop = TargetHop::LastHop;
1148
1149        let time_prov = self.time_provider.clone();
1150
1151        let memquota = StreamAccount::new(self.mq_account())?;
1152        let (sender, receiver) = stream_queue(STREAM_READER_BUFFER, &memquota, &time_prov)?;
1153
1154        let (tx, rx) = oneshot::channel();
1155        let (msg_tx, msg_rx) =
1156            MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(time_prov, memquota.as_raw_account())?;
1157
1158        let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
1159
1160        // A channel for the reactor to request a new drain rate from the reader.
1161        // Typically this notification will be sent after an XOFF is sent so that the reader can
1162        // send us a new drain rate when the stream data queue becomes empty.
1163        let mut drain_rate_request_tx = NotifySender::new_typed();
1164        let drain_rate_request_rx = drain_rate_request_tx.subscribe();
1165
1166        self.control
1167            .unbounded_send(CtrlMsg::BeginStream {
1168                hop,
1169                message: begin_msg,
1170                sender,
1171                rx: msg_rx,
1172                rate_limit_notifier: rate_limit_tx,
1173                drain_rate_requester: drain_rate_request_tx,
1174                done: tx,
1175                cmd_checker,
1176            })
1177            .map_err(|_| Error::CircuitClosed)?;
1178
1179        let (stream_id, hop, relay_cell_format) = rx.await.map_err(|_| Error::CircuitClosed)??;
1180
1181        let target = StreamTarget {
1182            circ: self.clone(),
1183            tx: msg_tx,
1184            hop,
1185            stream_id,
1186            relay_cell_format,
1187            rate_limit_stream: rate_limit_rx,
1188        };
1189
1190        // can be used to build a reader that supports XON/XOFF flow control
1191        let xon_xoff_reader_ctrl = XonXoffReaderCtrl::new(drain_rate_request_rx, target.clone());
1192
1193        let stream_receiver = StreamReceiver {
1194            target: target.clone(),
1195            receiver,
1196            recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
1197            ended: false,
1198        };
1199
1200        let components = StreamComponents {
1201            stream_receiver,
1202            target,
1203            memquota,
1204            xon_xoff_reader_ctrl,
1205        };
1206
1207        Ok(components)
1208    }
1209
1210    /// Start a DataStream (anonymized connection) to the given
1211    /// address and port, using a BEGIN cell.
1212    async fn begin_data_stream(
1213        self: &Arc<ClientCirc>,
1214        msg: AnyRelayMsg,
1215        optimistic: bool,
1216    ) -> Result<DataStream> {
1217        let components = self
1218            .begin_stream_impl(msg, DataCmdChecker::new_any())
1219            .await?;
1220
1221        let StreamComponents {
1222            stream_receiver,
1223            target,
1224            memquota,
1225            xon_xoff_reader_ctrl,
1226        } = components;
1227
1228        let mut stream = DataStream::new(
1229            self.time_provider.clone(),
1230            stream_receiver,
1231            xon_xoff_reader_ctrl,
1232            target,
1233            memquota,
1234        );
1235        if !optimistic {
1236            stream.wait_for_connection().await?;
1237        }
1238        Ok(stream)
1239    }
1240
1241    /// Start a stream to the given address and port, using a BEGIN
1242    /// cell.
1243    ///
1244    /// The use of a string for the address is intentional: you should let
1245    /// the remote Tor relay do the hostname lookup for you.
1246    pub async fn begin_stream(
1247        self: &Arc<ClientCirc>,
1248        target: &str,
1249        port: u16,
1250        parameters: Option<StreamParameters>,
1251    ) -> Result<DataStream> {
1252        let parameters = parameters.unwrap_or_default();
1253        let begin_flags = parameters.begin_flags();
1254        let optimistic = parameters.is_optimistic();
1255        let target = if parameters.suppressing_hostname() {
1256            ""
1257        } else {
1258            target
1259        };
1260        let beginmsg = Begin::new(target, port, begin_flags)
1261            .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
1262        self.begin_data_stream(beginmsg.into(), optimistic).await
1263    }
1264
1265    /// Start a new stream to the last relay in the circuit, using
1266    /// a BEGIN_DIR cell.
1267    pub async fn begin_dir_stream(self: Arc<ClientCirc>) -> Result<DataStream> {
1268        // Note that we always open begindir connections optimistically.
1269        // Since they are local to a relay that we've already authenticated
1270        // with and built a circuit to, there should be no additional checks
1271        // we need to perform to see whether the BEGINDIR will succeed.
1272        self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
1273            .await
1274    }
1275
1276    /// Perform a DNS lookup, using a RESOLVE cell with the last relay
1277    /// in this circuit.
1278    ///
1279    /// Note that this function does not check for timeouts; that's
1280    /// the caller's responsibility.
1281    pub async fn resolve(self: &Arc<ClientCirc>, hostname: &str) -> Result<Vec<IpAddr>> {
1282        let resolve_msg = Resolve::new(hostname);
1283
1284        let resolved_msg = self.try_resolve(resolve_msg).await?;
1285
1286        resolved_msg
1287            .into_answers()
1288            .into_iter()
1289            .filter_map(|(val, _)| match resolvedval_to_result(val) {
1290                Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
1291                Ok(_) => None,
1292                Err(e) => Some(Err(e)),
1293            })
1294            .collect()
1295    }
1296
1297    /// Perform a reverse DNS lookup, by sending a RESOLVE cell with
1298    /// the last relay on this circuit.
1299    ///
1300    /// Note that this function does not check for timeouts; that's
1301    /// the caller's responsibility.
1302    pub async fn resolve_ptr(self: &Arc<ClientCirc>, addr: IpAddr) -> Result<Vec<String>> {
1303        let resolve_ptr_msg = Resolve::new_reverse(&addr);
1304
1305        let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
1306
1307        resolved_msg
1308            .into_answers()
1309            .into_iter()
1310            .filter_map(|(val, _)| match resolvedval_to_result(val) {
1311                Ok(ResolvedVal::Hostname(v)) => Some(
1312                    String::from_utf8(v)
1313                        .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
1314                ),
1315                Ok(_) => None,
1316                Err(e) => Some(Err(e)),
1317            })
1318            .collect()
1319    }
1320
1321    /// Helper: Send the resolve message, and read resolved message from
1322    /// resolve stream.
1323    async fn try_resolve(self: &Arc<ClientCirc>, msg: Resolve) -> Result<Resolved> {
1324        let components = self
1325            .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
1326            .await?;
1327
1328        let StreamComponents {
1329            stream_receiver,
1330            target: _,
1331            memquota,
1332            xon_xoff_reader_ctrl: _,
1333        } = components;
1334
1335        let mut resolve_stream = ResolveStream::new(stream_receiver, memquota);
1336        resolve_stream.read_msg().await
1337    }
1338
1339    /// Shut down this circuit, along with all streams that are using it.
1340    /// Happens asynchronously (i.e. the circuit won't necessarily be done shutting down
1341    /// immediately after this function returns!).
1342    ///
1343    /// Note that other references to this circuit may exist.  If they
1344    /// do, they will stop working after you call this function.
1345    ///
1346    /// It's not necessary to call this method if you're just done
1347    /// with a circuit: the circuit should close on its own once nothing
1348    /// is using it any more.
1349    pub fn terminate(&self) {
1350        let _ = self.command.unbounded_send(CtrlCmd::Shutdown);
1351    }
1352
1353    /// Called when a circuit-level protocol error has occurred and the
1354    /// circuit needs to shut down.
1355    ///
1356    /// This is a separate function because we may eventually want to have
1357    /// it do more than just shut down.
1358    ///
1359    /// As with `terminate`, this function is asynchronous.
1360    pub(crate) fn protocol_error(&self) {
1361        self.terminate();
1362    }
1363
1364    /// Return true if this circuit is closed and therefore unusable.
1365    pub fn is_closing(&self) -> bool {
1366        self.control.is_closed()
1367    }
1368
1369    /// Return a process-unique identifier for this circuit.
1370    pub fn unique_id(&self) -> UniqId {
1371        self.unique_id
1372    }
1373
1374    /// Return the number of hops in this circuit.
1375    ///
1376    /// NOTE: This function will currently return only the number of hops
1377    /// _currently_ in the circuit. If there is an extend operation in progress,
1378    /// the currently pending hop may or may not be counted, depending on whether
1379    /// the extend operation finishes before this call is done.
1380    pub fn n_hops(&self) -> Result<usize> {
1381        self.mutable
1382            .n_hops(self.unique_id)
1383            .map_err(|_| Error::CircuitClosed)
1384    }
1385
1386    /// Return a future that will resolve once this circuit has closed.
1387    ///
1388    /// Note that this method does not itself cause the circuit to shut down.
1389    ///
1390    /// TODO: Perhaps this should return some kind of status indication instead
1391    /// of just ()
1392    #[cfg(feature = "experimental-api")]
1393    pub fn wait_for_close(&self) -> impl futures::Future<Output = ()> + Send + Sync + 'static {
1394        self.reactor_closed_rx.clone().map(|_| ())
1395    }
1396}
1397
1398/// Handle to use during an ongoing protocol exchange with a circuit's last hop
1399///
1400/// This is obtained from [`ClientCirc::start_conversation`],
1401/// and used to send messages to the last hop relay.
1402//
1403// TODO(conflux): this should use ClientTunnel, and it should be moved into
1404// the tunnel module.
1405#[cfg(feature = "send-control-msg")]
1406#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
1407pub struct Conversation<'r>(&'r ClientCirc);
1408
1409#[cfg(feature = "send-control-msg")]
1410#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
1411impl Conversation<'_> {
1412    /// Send a protocol message as part of an ad-hoc exchange
1413    ///
1414    /// Responses are handled by the `MsgHandler` set up
1415    /// when the `Conversation` was created.
1416    pub async fn send_message(&self, msg: relaycell::msg::AnyRelayMsg) -> Result<()> {
1417        self.send_internal(Some(msg), None).await
1418    }
1419
1420    /// Send a `SendMsgAndInstallHandler` to the reactor and wait for the outcome
1421    ///
1422    /// The guts of `start_conversation` and `Conversation::send_msg`
1423    pub(crate) async fn send_internal(
1424        &self,
1425        msg: Option<relaycell::msg::AnyRelayMsg>,
1426        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
1427    ) -> Result<()> {
1428        let msg = msg.map(|msg| relaycell::AnyRelayMsgOuter::new(None, msg));
1429        let (sender, receiver) = oneshot::channel();
1430
1431        let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
1432            msg,
1433            handler,
1434            sender,
1435        };
1436        self.0
1437            .control
1438            .unbounded_send(ctrl_msg)
1439            .map_err(|_| Error::CircuitClosed)?;
1440
1441        receiver.await.map_err(|_| Error::CircuitClosed)?
1442    }
1443}
1444
1445impl PendingClientCirc {
1446    /// Instantiate a new circuit object: used from Channel::new_circ().
1447    ///
1448    /// Does not send a CREATE* cell on its own.
1449    ///
1450    ///
1451    pub(crate) fn new(
1452        id: CircId,
1453        channel: Arc<Channel>,
1454        createdreceiver: oneshot::Receiver<CreateResponse>,
1455        input: CircuitRxReceiver,
1456        unique_id: UniqId,
1457        runtime: DynTimeProvider,
1458        memquota: CircuitAccount,
1459    ) -> (PendingClientCirc, crate::tunnel::reactor::Reactor) {
1460        let time_provider = channel.time_provider().clone();
1461        let (reactor, control_tx, command_tx, reactor_closed_rx, mutable) =
1462            Reactor::new(channel, id, unique_id, input, runtime, memquota.clone());
1463
1464        let circuit = ClientCirc {
1465            mutable,
1466            unique_id,
1467            control: control_tx,
1468            command: command_tx,
1469            reactor_closed_rx: reactor_closed_rx.shared(),
1470            #[cfg(test)]
1471            circid: id,
1472            memquota,
1473            time_provider,
1474        };
1475
1476        let pending = PendingClientCirc {
1477            recvcreated: createdreceiver,
1478            circ: Arc::new(circuit),
1479        };
1480        (pending, reactor)
1481    }
1482
1483    /// Extract the process-unique identifier for this pending circuit.
1484    pub fn peek_unique_id(&self) -> UniqId {
1485        self.circ.unique_id
1486    }
1487
1488    /// Use the (questionable!) CREATE_FAST handshake to connect to the
1489    /// first hop of this circuit.
1490    ///
1491    /// There's no authentication in CRATE_FAST,
1492    /// so we don't need to know whom we're connecting to: we're just
1493    /// connecting to whichever relay the channel is for.
1494    pub async fn create_firsthop_fast(self, params: CircParameters) -> Result<Arc<ClientCirc>> {
1495        // We no nothing about this relay, so we assume it supports no protocol capabilities at all.
1496        //
1497        // TODO: If we had a consensus, we could assume it supported all required-relay-protocols.
1498        let protocols = tor_protover::Protocols::new();
1499        let settings =
1500            HopSettings::from_params_and_caps(HopNegotiationType::None, &params, &protocols)?;
1501        let (tx, rx) = oneshot::channel();
1502        self.circ
1503            .control
1504            .unbounded_send(CtrlMsg::Create {
1505                recv_created: self.recvcreated,
1506                handshake: CircuitHandshake::CreateFast,
1507                settings,
1508                done: tx,
1509            })
1510            .map_err(|_| Error::CircuitClosed)?;
1511
1512        rx.await.map_err(|_| Error::CircuitClosed)??;
1513
1514        Ok(self.circ)
1515    }
1516
1517    /// Use the most appropriate handshake to connect to the first hop of this circuit.
1518    ///
1519    /// Note that the provided 'target' must match the channel's target,
1520    /// or the handshake will fail.
1521    pub async fn create_firsthop<Tg>(
1522        self,
1523        target: &Tg,
1524        params: CircParameters,
1525    ) -> Result<Arc<ClientCirc>>
1526    where
1527        Tg: tor_linkspec::CircTarget,
1528    {
1529        // (See note in ClientCirc::extend.)
1530        if target
1531            .protovers()
1532            .supports_named_subver(named::RELAY_NTORV3)
1533        {
1534            self.create_firsthop_ntor_v3(target, params).await
1535        } else {
1536            self.create_firsthop_ntor(target, params).await
1537        }
1538    }
1539
1540    /// Use the ntor handshake to connect to the first hop of this circuit.
1541    ///
1542    /// Note that the provided 'target' must match the channel's target,
1543    /// or the handshake will fail.
1544    pub async fn create_firsthop_ntor<Tg>(
1545        self,
1546        target: &Tg,
1547        params: CircParameters,
1548    ) -> Result<Arc<ClientCirc>>
1549    where
1550        Tg: tor_linkspec::CircTarget,
1551    {
1552        let (tx, rx) = oneshot::channel();
1553        let settings = HopSettings::from_params_and_caps(
1554            HopNegotiationType::None,
1555            &params,
1556            target.protovers(),
1557        )?;
1558
1559        self.circ
1560            .control
1561            .unbounded_send(CtrlMsg::Create {
1562                recv_created: self.recvcreated,
1563                handshake: CircuitHandshake::Ntor {
1564                    public_key: NtorPublicKey {
1565                        id: *target
1566                            .rsa_identity()
1567                            .ok_or(Error::MissingId(RelayIdType::Rsa))?,
1568                        pk: *target.ntor_onion_key(),
1569                    },
1570                    ed_identity: *target
1571                        .ed_identity()
1572                        .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
1573                },
1574                settings,
1575                done: tx,
1576            })
1577            .map_err(|_| Error::CircuitClosed)?;
1578
1579        rx.await.map_err(|_| Error::CircuitClosed)??;
1580
1581        Ok(self.circ)
1582    }
1583
1584    /// Use the ntor_v3 handshake to connect to the first hop of this circuit.
1585    ///
1586    /// Assumes that the target supports ntor_v3. The caller should verify
1587    /// this before calling this function, e.g. by validating that the target
1588    /// has advertised ["Relay=4"](https://spec.torproject.org/tor-spec/subprotocol-versioning.html#relay).
1589    ///
1590    /// Note that the provided 'target' must match the channel's target,
1591    /// or the handshake will fail.
1592    pub async fn create_firsthop_ntor_v3<Tg>(
1593        self,
1594        target: &Tg,
1595        params: CircParameters,
1596    ) -> Result<Arc<ClientCirc>>
1597    where
1598        Tg: tor_linkspec::CircTarget,
1599    {
1600        let settings = HopSettings::from_params_and_caps(
1601            HopNegotiationType::Full,
1602            &params,
1603            target.protovers(),
1604        )?;
1605        let (tx, rx) = oneshot::channel();
1606
1607        self.circ
1608            .control
1609            .unbounded_send(CtrlMsg::Create {
1610                recv_created: self.recvcreated,
1611                handshake: CircuitHandshake::NtorV3 {
1612                    public_key: NtorV3PublicKey {
1613                        id: *target
1614                            .ed_identity()
1615                            .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
1616                        pk: *target.ntor_onion_key(),
1617                    },
1618                },
1619                settings,
1620                done: tx,
1621            })
1622            .map_err(|_| Error::CircuitClosed)?;
1623
1624        rx.await.map_err(|_| Error::CircuitClosed)??;
1625
1626        Ok(self.circ)
1627    }
1628}
1629
1630/// A collection of components that can be combined to implement a Tor stream,
1631/// or anything that requires a stream ID.
1632///
1633/// Not all components may be needed, depending on the purpose of the "stream".
1634/// For example we build `RELAY_RESOLVE` requests like we do data streams,
1635/// but they won't use the `StreamTarget` as they don't need to send additional
1636/// messages.
1637#[derive(Debug)]
1638pub(crate) struct StreamComponents {
1639    /// A [`Stream`](futures::Stream) of incoming relay messages for this Tor stream.
1640    pub(crate) stream_receiver: StreamReceiver,
1641    /// A handle that can communicate messages to the circuit reactor for this stream.
1642    pub(crate) target: StreamTarget,
1643    /// The memquota [account](tor_memquota::Account) to use for data on this stream.
1644    pub(crate) memquota: StreamAccount,
1645    /// The control information needed to add XON/XOFF flow control to the stream.
1646    pub(crate) xon_xoff_reader_ctrl: XonXoffReaderCtrl,
1647}
1648
1649/// Convert a [`ResolvedVal`] into a Result, based on whether or not
1650/// it represents an error.
1651fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
1652    match val {
1653        ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
1654        ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
1655        ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
1656        _ => Ok(val),
1657    }
1658}
1659
1660#[cfg(test)]
1661pub(crate) mod test {
1662    // @@ begin test lint list maintained by maint/add_warning @@
1663    #![allow(clippy::bool_assert_comparison)]
1664    #![allow(clippy::clone_on_copy)]
1665    #![allow(clippy::dbg_macro)]
1666    #![allow(clippy::mixed_attributes_style)]
1667    #![allow(clippy::print_stderr)]
1668    #![allow(clippy::print_stdout)]
1669    #![allow(clippy::single_char_pattern)]
1670    #![allow(clippy::unwrap_used)]
1671    #![allow(clippy::unchecked_duration_subtraction)]
1672    #![allow(clippy::useless_vec)]
1673    #![allow(clippy::needless_pass_by_value)]
1674    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1675
1676    use super::*;
1677    use crate::channel::OpenChanCellS2C;
1678    use crate::channel::{test::new_reactor, CodecError};
1679    use crate::congestion::test_utils::params::build_cc_vegas_params;
1680    use crate::crypto::cell::RelayCellBody;
1681    use crate::crypto::handshake::ntor_v3::NtorV3Server;
1682    #[cfg(feature = "hs-service")]
1683    use crate::stream::IncomingStreamRequestFilter;
1684    use chanmsg::{AnyChanMsg, Created2, CreatedFast};
1685    use futures::channel::mpsc::{Receiver, Sender};
1686    use futures::io::{AsyncReadExt, AsyncWriteExt};
1687    use futures::sink::SinkExt;
1688    use futures::stream::StreamExt;
1689    use futures::task::SpawnExt;
1690    use hex_literal::hex;
1691    use std::collections::{HashMap, VecDeque};
1692    use std::fmt::Debug;
1693    use std::time::Duration;
1694    use tor_basic_utils::test_rng::testing_rng;
1695    use tor_cell::chancell::{msg as chanmsg, AnyChanCell, BoxedCellBody, ChanCell, ChanCmd};
1696    use tor_cell::relaycell::extend::{self as extend_ext, CircRequestExt, CircResponseExt};
1697    use tor_cell::relaycell::msg::SendmeTag;
1698    use tor_cell::relaycell::{
1699        msg as relaymsg, AnyRelayMsgOuter, RelayCellFormat, RelayCmd, RelayMsg as _, StreamId,
1700    };
1701    use tor_linkspec::OwnedCircTarget;
1702    use tor_memquota::HasMemoryCost;
1703    use tor_rtcompat::Runtime;
1704    use tracing::trace;
1705    use tracing_test::traced_test;
1706
1707    #[cfg(feature = "conflux")]
1708    use {
1709        crate::tunnel::reactor::ConfluxHandshakeResult,
1710        crate::util::err::ConfluxHandshakeError,
1711        futures::future::FusedFuture,
1712        futures::lock::Mutex as AsyncMutex,
1713        std::pin::Pin,
1714        std::result::Result as StdResult,
1715        tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
1716        tor_cell::relaycell::msg::ConfluxLink,
1717        tor_rtmock::MockRuntime,
1718    };
1719
1720    impl PendingClientCirc {
1721        /// Testing only: Extract the circuit ID for this pending circuit.
1722        pub(crate) fn peek_circid(&self) -> CircId {
1723            self.circ.circid
1724        }
1725    }
1726
1727    impl ClientCirc {
1728        /// Testing only: Extract the circuit ID of this circuit.
1729        pub(crate) fn peek_circid(&self) -> CircId {
1730            self.circid
1731        }
1732    }
1733
1734    fn rmsg_to_ccmsg(id: Option<StreamId>, msg: relaymsg::AnyRelayMsg) -> ClientCircChanMsg {
1735        // TODO #1947: test other formats.
1736        let rfmt = RelayCellFormat::V0;
1737        let body: BoxedCellBody = AnyRelayMsgOuter::new(id, msg)
1738            .encode(rfmt, &mut testing_rng())
1739            .unwrap();
1740        let chanmsg = chanmsg::Relay::from(body);
1741        ClientCircChanMsg::Relay(chanmsg)
1742    }
1743
1744    // Example relay IDs and keys
1745    const EXAMPLE_SK: [u8; 32] =
1746        hex!("7789d92a89711a7e2874c61ea495452cfd48627b3ca2ea9546aafa5bf7b55803");
1747    const EXAMPLE_PK: [u8; 32] =
1748        hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253");
1749    const EXAMPLE_ED_ID: [u8; 32] = [6; 32];
1750    const EXAMPLE_RSA_ID: [u8; 20] = [10; 20];
1751
1752    /// Make an MPSC queue, of the type we use in Channels, but a fake one for testing
1753    #[cfg(test)]
1754    pub(crate) fn fake_mpsc<T: HasMemoryCost + Debug + Send>(
1755        buffer: usize,
1756    ) -> (StreamMpscSender<T>, StreamMpscReceiver<T>) {
1757        crate::fake_mpsc(buffer)
1758    }
1759
1760    /// return an example OwnedCircTarget that can get used for an ntor handshake.
1761    fn example_target() -> OwnedCircTarget {
1762        let mut builder = OwnedCircTarget::builder();
1763        builder
1764            .chan_target()
1765            .ed_identity(EXAMPLE_ED_ID.into())
1766            .rsa_identity(EXAMPLE_RSA_ID.into());
1767        builder
1768            .ntor_onion_key(EXAMPLE_PK.into())
1769            .protocols("FlowCtrl=1-2".parse().unwrap())
1770            .build()
1771            .unwrap()
1772    }
1773    fn example_ntor_key() -> crate::crypto::handshake::ntor::NtorSecretKey {
1774        crate::crypto::handshake::ntor::NtorSecretKey::new(
1775            EXAMPLE_SK.into(),
1776            EXAMPLE_PK.into(),
1777            EXAMPLE_RSA_ID.into(),
1778        )
1779    }
1780    fn example_ntor_v3_key() -> crate::crypto::handshake::ntor_v3::NtorV3SecretKey {
1781        crate::crypto::handshake::ntor_v3::NtorV3SecretKey::new(
1782            EXAMPLE_SK.into(),
1783            EXAMPLE_PK.into(),
1784            EXAMPLE_ED_ID.into(),
1785        )
1786    }
1787
1788    fn working_fake_channel<R: Runtime>(
1789        rt: &R,
1790    ) -> (
1791        Arc<Channel>,
1792        Receiver<AnyChanCell>,
1793        Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
1794    ) {
1795        let (channel, chan_reactor, rx, tx) = new_reactor(rt.clone());
1796        rt.spawn(async {
1797            let _ignore = chan_reactor.run().await;
1798        })
1799        .unwrap();
1800        (channel, rx, tx)
1801    }
1802
1803    /// Which handshake type to use.
1804    #[derive(Copy, Clone)]
1805    enum HandshakeType {
1806        Fast,
1807        Ntor,
1808        NtorV3,
1809    }
1810
1811    async fn test_create<R: Runtime>(rt: &R, handshake_type: HandshakeType, with_cc: bool) {
1812        // We want to try progressing from a pending circuit to a circuit
1813        // via a crate_fast handshake.
1814
1815        use crate::crypto::handshake::{fast::CreateFastServer, ntor::NtorServer, ServerHandshake};
1816
1817        let (chan, mut rx, _sink) = working_fake_channel(rt);
1818        let circid = CircId::new(128).unwrap();
1819        let (created_send, created_recv) = oneshot::channel();
1820        let (_circmsg_send, circmsg_recv) = fake_mpsc(64);
1821        let unique_id = UniqId::new(23, 17);
1822
1823        let (pending, reactor) = PendingClientCirc::new(
1824            circid,
1825            chan,
1826            created_recv,
1827            circmsg_recv,
1828            unique_id,
1829            DynTimeProvider::new(rt.clone()),
1830            CircuitAccount::new_noop(),
1831        );
1832
1833        rt.spawn(async {
1834            let _ignore = reactor.run().await;
1835        })
1836        .unwrap();
1837
1838        // Future to pretend to be a relay on the other end of the circuit.
1839        let simulate_relay_fut = async move {
1840            let mut rng = testing_rng();
1841            let create_cell = rx.next().await.unwrap();
1842            assert_eq!(create_cell.circid(), Some(circid));
1843            let reply = match handshake_type {
1844                HandshakeType::Fast => {
1845                    let cf = match create_cell.msg() {
1846                        AnyChanMsg::CreateFast(cf) => cf,
1847                        other => panic!("{:?}", other),
1848                    };
1849                    let (_, rep) = CreateFastServer::server(
1850                        &mut rng,
1851                        &mut |_: &()| Some(()),
1852                        &[()],
1853                        cf.handshake(),
1854                    )
1855                    .unwrap();
1856                    CreateResponse::CreatedFast(CreatedFast::new(rep))
1857                }
1858                HandshakeType::Ntor => {
1859                    let c2 = match create_cell.msg() {
1860                        AnyChanMsg::Create2(c2) => c2,
1861                        other => panic!("{:?}", other),
1862                    };
1863                    let (_, rep) = NtorServer::server(
1864                        &mut rng,
1865                        &mut |_: &()| Some(()),
1866                        &[example_ntor_key()],
1867                        c2.body(),
1868                    )
1869                    .unwrap();
1870                    CreateResponse::Created2(Created2::new(rep))
1871                }
1872                HandshakeType::NtorV3 => {
1873                    let c2 = match create_cell.msg() {
1874                        AnyChanMsg::Create2(c2) => c2,
1875                        other => panic!("{:?}", other),
1876                    };
1877                    let mut reply_fn = if with_cc {
1878                        |client_exts: &[CircRequestExt]| {
1879                            let _ = client_exts
1880                                .iter()
1881                                .find(|e| matches!(e, CircRequestExt::CcRequest(_)))
1882                                .expect("Client failed to request CC");
1883                            // This needs to be aligned to test_utils params
1884                            // value due to validation that needs it in range.
1885                            Some(vec![CircResponseExt::CcResponse(
1886                                extend_ext::CcResponse::new(31),
1887                            )])
1888                        }
1889                    } else {
1890                        |_: &_| Some(vec![])
1891                    };
1892                    let (_, rep) = NtorV3Server::server(
1893                        &mut rng,
1894                        &mut reply_fn,
1895                        &[example_ntor_v3_key()],
1896                        c2.body(),
1897                    )
1898                    .unwrap();
1899                    CreateResponse::Created2(Created2::new(rep))
1900                }
1901            };
1902            created_send.send(reply).unwrap();
1903        };
1904        // Future to pretend to be a client.
1905        let client_fut = async move {
1906            let target = example_target();
1907            let params = CircParameters::default();
1908            let ret = match handshake_type {
1909                HandshakeType::Fast => {
1910                    trace!("doing fast create");
1911                    pending.create_firsthop_fast(params).await
1912                }
1913                HandshakeType::Ntor => {
1914                    trace!("doing ntor create");
1915                    pending.create_firsthop_ntor(&target, params).await
1916                }
1917                HandshakeType::NtorV3 => {
1918                    let params = if with_cc {
1919                        // Setup CC vegas parameters.
1920                        CircParameters::new(true, build_cc_vegas_params())
1921                    } else {
1922                        params
1923                    };
1924                    trace!("doing ntor_v3 create");
1925                    pending.create_firsthop_ntor_v3(&target, params).await
1926                }
1927            };
1928            trace!("create done: result {:?}", ret);
1929            ret
1930        };
1931
1932        let (circ, _) = futures::join!(client_fut, simulate_relay_fut);
1933
1934        let _circ = circ.unwrap();
1935
1936        // pfew!  We've build a circuit!  Let's make sure it has one hop.
1937        assert_eq!(_circ.n_hops().unwrap(), 1);
1938    }
1939
1940    #[traced_test]
1941    #[test]
1942    fn test_create_fast() {
1943        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1944            test_create(&rt, HandshakeType::Fast, false).await;
1945        });
1946    }
1947    #[traced_test]
1948    #[test]
1949    fn test_create_ntor() {
1950        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1951            test_create(&rt, HandshakeType::Ntor, false).await;
1952        });
1953    }
1954    #[traced_test]
1955    #[test]
1956    fn test_create_ntor_v3() {
1957        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1958            test_create(&rt, HandshakeType::NtorV3, false).await;
1959        });
1960    }
1961    #[traced_test]
1962    #[test]
1963    #[cfg(feature = "flowctl-cc")]
1964    fn test_create_ntor_v3_with_cc() {
1965        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1966            test_create(&rt, HandshakeType::NtorV3, true).await;
1967        });
1968    }
1969
1970    // An encryption layer that doesn't do any crypto.   Can be used
1971    // as inbound or outbound, but not both at once.
1972    pub(crate) struct DummyCrypto {
1973        counter_tag: [u8; 20],
1974        counter: u32,
1975        lasthop: bool,
1976    }
1977    impl DummyCrypto {
1978        fn next_tag(&mut self) -> SendmeTag {
1979            #![allow(clippy::identity_op)]
1980            self.counter_tag[0] = ((self.counter >> 0) & 255) as u8;
1981            self.counter_tag[1] = ((self.counter >> 8) & 255) as u8;
1982            self.counter_tag[2] = ((self.counter >> 16) & 255) as u8;
1983            self.counter_tag[3] = ((self.counter >> 24) & 255) as u8;
1984            self.counter += 1;
1985            self.counter_tag.into()
1986        }
1987    }
1988
1989    impl crate::crypto::cell::OutboundClientLayer for DummyCrypto {
1990        fn originate_for(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
1991            self.next_tag()
1992        }
1993        fn encrypt_outbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
1994    }
1995    impl crate::crypto::cell::InboundClientLayer for DummyCrypto {
1996        fn decrypt_inbound(
1997            &mut self,
1998            _cmd: ChanCmd,
1999            _cell: &mut RelayCellBody,
2000        ) -> Option<SendmeTag> {
2001            if self.lasthop {
2002                Some(self.next_tag())
2003            } else {
2004                None
2005            }
2006        }
2007    }
2008    impl DummyCrypto {
2009        pub(crate) fn new(lasthop: bool) -> Self {
2010            DummyCrypto {
2011                counter_tag: [0; 20],
2012                counter: 0,
2013                lasthop,
2014            }
2015        }
2016    }
2017
2018    // Helper: set up a 3-hop circuit with no encryption, where the
2019    // next inbound message seems to come from hop next_msg_from
2020    async fn newcirc_ext<R: Runtime>(
2021        rt: &R,
2022        unique_id: UniqId,
2023        chan: Arc<Channel>,
2024        hops: Vec<path::HopDetail>,
2025        next_msg_from: HopNum,
2026        params: CircParameters,
2027    ) -> (Arc<ClientCirc>, CircuitRxSender) {
2028        let circid = CircId::new(128).unwrap();
2029        let (_created_send, created_recv) = oneshot::channel();
2030        let (circmsg_send, circmsg_recv) = fake_mpsc(64);
2031
2032        let (pending, reactor) = PendingClientCirc::new(
2033            circid,
2034            chan,
2035            created_recv,
2036            circmsg_recv,
2037            unique_id,
2038            DynTimeProvider::new(rt.clone()),
2039            CircuitAccount::new_noop(),
2040        );
2041
2042        rt.spawn(async {
2043            let _ignore = reactor.run().await;
2044        })
2045        .unwrap();
2046
2047        let PendingClientCirc {
2048            circ,
2049            recvcreated: _,
2050        } = pending;
2051
2052        // TODO #1067: Support other formats
2053        let relay_cell_format = RelayCellFormat::V0;
2054
2055        let last_hop_num = u8::try_from(hops.len() - 1).unwrap();
2056        for (idx, peer_id) in hops.into_iter().enumerate() {
2057            let (tx, rx) = oneshot::channel();
2058            let idx = idx as u8;
2059
2060            circ.command
2061                .unbounded_send(CtrlCmd::AddFakeHop {
2062                    relay_cell_format,
2063                    fwd_lasthop: idx == last_hop_num,
2064                    rev_lasthop: idx == u8::from(next_msg_from),
2065                    peer_id,
2066                    params: params.clone(),
2067                    done: tx,
2068                })
2069                .unwrap();
2070            rx.await.unwrap().unwrap();
2071        }
2072
2073        (circ, circmsg_send)
2074    }
2075
2076    // Helper: set up a 3-hop circuit with no encryption, where the
2077    // next inbound message seems to come from hop next_msg_from
2078    async fn newcirc<R: Runtime>(rt: &R, chan: Arc<Channel>) -> (Arc<ClientCirc>, CircuitRxSender) {
2079        let hops = std::iter::repeat_with(|| {
2080            let peer_id = tor_linkspec::OwnedChanTarget::builder()
2081                .ed_identity([4; 32].into())
2082                .rsa_identity([5; 20].into())
2083                .build()
2084                .expect("Could not construct fake hop");
2085
2086            path::HopDetail::Relay(peer_id)
2087        })
2088        .take(3)
2089        .collect();
2090
2091        let unique_id = UniqId::new(23, 17);
2092        newcirc_ext(
2093            rt,
2094            unique_id,
2095            chan,
2096            hops,
2097            2.into(),
2098            CircParameters::default(),
2099        )
2100        .await
2101    }
2102
2103    /// Create `n` distinct [`path::HopDetail`]s,
2104    /// with the specified `start_idx` for the dummy identities.
2105    fn hop_details(n: u8, start_idx: u8) -> Vec<path::HopDetail> {
2106        (0..n)
2107            .map(|idx| {
2108                let peer_id = tor_linkspec::OwnedChanTarget::builder()
2109                    .ed_identity([idx + start_idx; 32].into())
2110                    .rsa_identity([idx + start_idx + 1; 20].into())
2111                    .build()
2112                    .expect("Could not construct fake hop");
2113
2114                path::HopDetail::Relay(peer_id)
2115            })
2116            .collect()
2117    }
2118
2119    async fn test_extend<R: Runtime>(rt: &R, handshake_type: HandshakeType) {
2120        use crate::crypto::handshake::{ntor::NtorServer, ServerHandshake};
2121
2122        let (chan, mut rx, _sink) = working_fake_channel(rt);
2123        let (circ, mut sink) = newcirc(rt, chan).await;
2124        let circid = circ.peek_circid();
2125        let params = CircParameters::default();
2126
2127        let extend_fut = async move {
2128            let target = example_target();
2129            match handshake_type {
2130                HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
2131                HandshakeType::Ntor => circ.extend_ntor(&target, params).await.unwrap(),
2132                HandshakeType::NtorV3 => circ.extend_ntor_v3(&target, params).await.unwrap(),
2133            };
2134            circ // gotta keep the circ alive, or the reactor would exit.
2135        };
2136        let reply_fut = async move {
2137            // We've disabled encryption on this circuit, so we can just
2138            // read the extend2 cell.
2139            let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2140            assert_eq!(id, Some(circid));
2141            let rmsg = match chmsg {
2142                AnyChanMsg::RelayEarly(r) => {
2143                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2144                        .unwrap()
2145                }
2146                other => panic!("{:?}", other),
2147            };
2148            let e2 = match rmsg.msg() {
2149                AnyRelayMsg::Extend2(e2) => e2,
2150                other => panic!("{:?}", other),
2151            };
2152            let mut rng = testing_rng();
2153            let reply = match handshake_type {
2154                HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
2155                HandshakeType::Ntor => {
2156                    let (_keygen, reply) = NtorServer::server(
2157                        &mut rng,
2158                        &mut |_: &()| Some(()),
2159                        &[example_ntor_key()],
2160                        e2.handshake(),
2161                    )
2162                    .unwrap();
2163                    reply
2164                }
2165                HandshakeType::NtorV3 => {
2166                    let (_keygen, reply) = NtorV3Server::server(
2167                        &mut rng,
2168                        &mut |_: &[CircRequestExt]| Some(vec![]),
2169                        &[example_ntor_v3_key()],
2170                        e2.handshake(),
2171                    )
2172                    .unwrap();
2173                    reply
2174                }
2175            };
2176
2177            let extended2 = relaymsg::Extended2::new(reply).into();
2178            sink.send(rmsg_to_ccmsg(None, extended2)).await.unwrap();
2179            (sink, rx) // gotta keep the sink and receiver alive, or the reactor will exit.
2180        };
2181
2182        let (circ, (_sink, _rx)) = futures::join!(extend_fut, reply_fut);
2183
2184        // Did we really add another hop?
2185        assert_eq!(circ.n_hops().unwrap(), 4);
2186
2187        // Do the path accessors report a reasonable outcome?
2188        {
2189            let path = circ.path_ref().unwrap();
2190            let path = path
2191                .all_hops()
2192                .filter_map(|hop| match hop {
2193                    path::HopDetail::Relay(r) => Some(r),
2194                    #[cfg(feature = "hs-common")]
2195                    path::HopDetail::Virtual => None,
2196                })
2197                .collect::<Vec<_>>();
2198
2199            assert_eq!(path.len(), 4);
2200            use tor_linkspec::HasRelayIds;
2201            assert_eq!(path[3].ed_identity(), example_target().ed_identity());
2202            assert_ne!(path[0].ed_identity(), example_target().ed_identity());
2203        }
2204        {
2205            let path = circ.path_ref().unwrap();
2206            assert_eq!(path.n_hops(), 4);
2207            use tor_linkspec::HasRelayIds;
2208            assert_eq!(
2209                path.hops()[3].as_chan_target().unwrap().ed_identity(),
2210                example_target().ed_identity()
2211            );
2212            assert_ne!(
2213                path.hops()[0].as_chan_target().unwrap().ed_identity(),
2214                example_target().ed_identity()
2215            );
2216        }
2217    }
2218
2219    #[traced_test]
2220    #[test]
2221    fn test_extend_ntor() {
2222        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2223            test_extend(&rt, HandshakeType::Ntor).await;
2224        });
2225    }
2226
2227    #[traced_test]
2228    #[test]
2229    fn test_extend_ntor_v3() {
2230        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2231            test_extend(&rt, HandshakeType::NtorV3).await;
2232        });
2233    }
2234
2235    async fn bad_extend_test_impl<R: Runtime>(
2236        rt: &R,
2237        reply_hop: HopNum,
2238        bad_reply: ClientCircChanMsg,
2239    ) -> Error {
2240        let (chan, _rx, _sink) = working_fake_channel(rt);
2241        let hops = std::iter::repeat_with(|| {
2242            let peer_id = tor_linkspec::OwnedChanTarget::builder()
2243                .ed_identity([4; 32].into())
2244                .rsa_identity([5; 20].into())
2245                .build()
2246                .expect("Could not construct fake hop");
2247
2248            path::HopDetail::Relay(peer_id)
2249        })
2250        .take(3)
2251        .collect();
2252
2253        let unique_id = UniqId::new(23, 17);
2254        let (circ, mut sink) = newcirc_ext(
2255            rt,
2256            unique_id,
2257            chan,
2258            hops,
2259            reply_hop,
2260            CircParameters::default(),
2261        )
2262        .await;
2263        let params = CircParameters::default();
2264
2265        let target = example_target();
2266        #[allow(clippy::clone_on_copy)]
2267        let rtc = rt.clone();
2268        let sink_handle = rt
2269            .spawn_with_handle(async move {
2270                rtc.sleep(Duration::from_millis(100)).await;
2271                sink.send(bad_reply).await.unwrap();
2272                sink
2273            })
2274            .unwrap();
2275        let outcome = circ.extend_ntor(&target, params).await;
2276        let _sink = sink_handle.await;
2277
2278        assert_eq!(circ.n_hops().unwrap(), 3);
2279        assert!(outcome.is_err());
2280        outcome.unwrap_err()
2281    }
2282
2283    #[traced_test]
2284    #[test]
2285    fn bad_extend_wronghop() {
2286        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2287            let extended2 = relaymsg::Extended2::new(vec![]).into();
2288            let cc = rmsg_to_ccmsg(None, extended2);
2289
2290            let error = bad_extend_test_impl(&rt, 1.into(), cc).await;
2291            // This case shows up as a CircDestroy, since a message sent
2292            // from the wrong hop won't even be delivered to the extend
2293            // code's meta-handler.  Instead the unexpected message will cause
2294            // the circuit to get torn down.
2295            match error {
2296                Error::CircuitClosed => {}
2297                x => panic!("got other error: {}", x),
2298            }
2299        });
2300    }
2301
2302    #[traced_test]
2303    #[test]
2304    fn bad_extend_wrongtype() {
2305        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2306            let extended = relaymsg::Extended::new(vec![7; 200]).into();
2307            let cc = rmsg_to_ccmsg(None, extended);
2308
2309            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
2310            match error {
2311                Error::BytesErr {
2312                    err: tor_bytes::Error::InvalidMessage(_),
2313                    object: "extended2 message",
2314                } => {}
2315                other => panic!("{:?}", other),
2316            }
2317        });
2318    }
2319
2320    #[traced_test]
2321    #[test]
2322    fn bad_extend_destroy() {
2323        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2324            let cc = ClientCircChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
2325            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
2326            match error {
2327                Error::CircuitClosed => {}
2328                other => panic!("{:?}", other),
2329            }
2330        });
2331    }
2332
2333    #[traced_test]
2334    #[test]
2335    fn bad_extend_crypto() {
2336        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2337            let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
2338            let cc = rmsg_to_ccmsg(None, extended2);
2339            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
2340            assert!(matches!(error, Error::BadCircHandshakeAuth));
2341        });
2342    }
2343
2344    #[traced_test]
2345    #[test]
2346    fn begindir() {
2347        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2348            let (chan, mut rx, _sink) = working_fake_channel(&rt);
2349            let (circ, mut sink) = newcirc(&rt, chan).await;
2350            let circid = circ.peek_circid();
2351
2352            let begin_and_send_fut = async move {
2353                // Here we'll say we've got a circuit, and we want to
2354                // make a simple BEGINDIR request with it.
2355                let mut stream = circ.begin_dir_stream().await.unwrap();
2356                stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
2357                stream.flush().await.unwrap();
2358                let mut buf = [0_u8; 1024];
2359                let n = stream.read(&mut buf).await.unwrap();
2360                assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
2361                let n = stream.read(&mut buf).await.unwrap();
2362                assert_eq!(n, 0);
2363                stream
2364            };
2365            let reply_fut = async move {
2366                // We've disabled encryption on this circuit, so we can just
2367                // read the begindir cell.
2368                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2369                assert_eq!(id, Some(circid));
2370                let rmsg = match chmsg {
2371                    AnyChanMsg::Relay(r) => {
2372                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2373                            .unwrap()
2374                    }
2375                    other => panic!("{:?}", other),
2376                };
2377                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2378                assert!(matches!(rmsg, AnyRelayMsg::BeginDir(_)));
2379
2380                // Reply with a Connected cell to indicate success.
2381                let connected = relaymsg::Connected::new_empty().into();
2382                sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2383
2384                // Now read a DATA cell...
2385                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2386                assert_eq!(id, Some(circid));
2387                let rmsg = match chmsg {
2388                    AnyChanMsg::Relay(r) => {
2389                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2390                            .unwrap()
2391                    }
2392                    other => panic!("{:?}", other),
2393                };
2394                let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
2395                assert_eq!(streamid_2, streamid);
2396                if let AnyRelayMsg::Data(d) = rmsg {
2397                    assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
2398                } else {
2399                    panic!();
2400                }
2401
2402                // Write another data cell in reply!
2403                let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
2404                    .unwrap()
2405                    .into();
2406                sink.send(rmsg_to_ccmsg(streamid, data)).await.unwrap();
2407
2408                // Send an END cell to say that the conversation is over.
2409                let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
2410                sink.send(rmsg_to_ccmsg(streamid, end)).await.unwrap();
2411
2412                (rx, sink) // gotta keep these alive, or the reactor will exit.
2413            };
2414
2415            let (_stream, (_rx, _sink)) = futures::join!(begin_and_send_fut, reply_fut);
2416        });
2417    }
2418
2419    // Test: close a stream, either by dropping it or by calling AsyncWriteExt::close.
2420    fn close_stream_helper(by_drop: bool) {
2421        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2422            let (chan, mut rx, _sink) = working_fake_channel(&rt);
2423            let (circ, mut sink) = newcirc(&rt, chan).await;
2424
2425            let stream_fut = async move {
2426                let stream = circ
2427                    .begin_stream("www.example.com", 80, None)
2428                    .await
2429                    .unwrap();
2430
2431                let (r, mut w) = stream.split();
2432                if by_drop {
2433                    // Drop the writer and the reader, which should close the stream.
2434                    drop(r);
2435                    drop(w);
2436                    (None, circ) // make sure to keep the circuit alive
2437                } else {
2438                    // Call close on the writer, while keeping the reader alive.
2439                    w.close().await.unwrap();
2440                    (Some(r), circ)
2441                }
2442            };
2443            let handler_fut = async {
2444                // Read the BEGIN message.
2445                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2446                let rmsg = match msg {
2447                    AnyChanMsg::Relay(r) => {
2448                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2449                            .unwrap()
2450                    }
2451                    other => panic!("{:?}", other),
2452                };
2453                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2454                assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
2455
2456                // Reply with a CONNECTED.
2457                let connected =
2458                    relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
2459                sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2460
2461                // Expect an END.
2462                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2463                let rmsg = match msg {
2464                    AnyChanMsg::Relay(r) => {
2465                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2466                            .unwrap()
2467                    }
2468                    other => panic!("{:?}", other),
2469                };
2470                let (_, rmsg) = rmsg.into_streamid_and_msg();
2471                assert_eq!(rmsg.cmd(), RelayCmd::END);
2472
2473                (rx, sink) // keep these alive or the reactor will exit.
2474            };
2475
2476            let ((_opt_reader, _circ), (_rx, _sink)) = futures::join!(stream_fut, handler_fut);
2477        });
2478    }
2479
2480    #[traced_test]
2481    #[test]
2482    fn drop_stream() {
2483        close_stream_helper(true);
2484    }
2485
2486    #[traced_test]
2487    #[test]
2488    fn close_stream() {
2489        close_stream_helper(false);
2490    }
2491
2492    // Set up a circuit and stream that expects some incoming SENDMEs.
2493    async fn setup_incoming_sendme_case<R: Runtime>(
2494        rt: &R,
2495        n_to_send: usize,
2496    ) -> (
2497        Arc<ClientCirc>,
2498        DataStream,
2499        CircuitRxSender,
2500        Option<StreamId>,
2501        usize,
2502        Receiver<AnyChanCell>,
2503        Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
2504    ) {
2505        let (chan, mut rx, sink2) = working_fake_channel(rt);
2506        let (circ, mut sink) = newcirc(rt, chan).await;
2507        let circid = circ.peek_circid();
2508
2509        let begin_and_send_fut = {
2510            let circ = circ.clone();
2511            async move {
2512                // Take our circuit and make a stream on it.
2513                let mut stream = circ
2514                    .begin_stream("www.example.com", 443, None)
2515                    .await
2516                    .unwrap();
2517                let junk = [0_u8; 1024];
2518                let mut remaining = n_to_send;
2519                while remaining > 0 {
2520                    let n = std::cmp::min(remaining, junk.len());
2521                    stream.write_all(&junk[..n]).await.unwrap();
2522                    remaining -= n;
2523                }
2524                stream.flush().await.unwrap();
2525                stream
2526            }
2527        };
2528
2529        let receive_fut = async move {
2530            // Read the begin cell.
2531            let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2532            let rmsg = match chmsg {
2533                AnyChanMsg::Relay(r) => {
2534                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2535                        .unwrap()
2536                }
2537                other => panic!("{:?}", other),
2538            };
2539            let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2540            assert!(matches!(rmsg, AnyRelayMsg::Begin(_)));
2541            // Reply with a connected cell...
2542            let connected = relaymsg::Connected::new_empty().into();
2543            sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2544            // Now read bytes from the stream until we have them all.
2545            let mut bytes_received = 0_usize;
2546            let mut cells_received = 0_usize;
2547            while bytes_received < n_to_send {
2548                // Read a data cell, and remember how much we got.
2549                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2550                assert_eq!(id, Some(circid));
2551
2552                let rmsg = match chmsg {
2553                    AnyChanMsg::Relay(r) => {
2554                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2555                            .unwrap()
2556                    }
2557                    other => panic!("{:?}", other),
2558                };
2559                let (streamid2, rmsg) = rmsg.into_streamid_and_msg();
2560                assert_eq!(streamid2, streamid);
2561                if let AnyRelayMsg::Data(dat) = rmsg {
2562                    cells_received += 1;
2563                    bytes_received += dat.as_ref().len();
2564                } else {
2565                    panic!();
2566                }
2567            }
2568
2569            (sink, streamid, cells_received, rx)
2570        };
2571
2572        let (stream, (sink, streamid, cells_received, rx)) =
2573            futures::join!(begin_and_send_fut, receive_fut);
2574
2575        (circ, stream, sink, streamid, cells_received, rx, sink2)
2576    }
2577
2578    #[traced_test]
2579    #[test]
2580    fn accept_valid_sendme() {
2581        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2582            let (circ, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
2583                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2584
2585            assert_eq!(cells_received, 301);
2586
2587            // Make sure that the circuit is indeed expecting the right sendmes
2588            {
2589                let (tx, rx) = oneshot::channel();
2590                circ.command
2591                    .unbounded_send(CtrlCmd::QuerySendWindow {
2592                        hop: 2.into(),
2593                        leg: circ.unique_id(),
2594                        done: tx,
2595                    })
2596                    .unwrap();
2597                let (window, tags) = rx.await.unwrap().unwrap();
2598                assert_eq!(window, 1000 - 301);
2599                assert_eq!(tags.len(), 3);
2600                // 100
2601                assert_eq!(
2602                    tags[0],
2603                    SendmeTag::from(hex!("6400000000000000000000000000000000000000"))
2604                );
2605                // 200
2606                assert_eq!(
2607                    tags[1],
2608                    SendmeTag::from(hex!("c800000000000000000000000000000000000000"))
2609                );
2610                // 300
2611                assert_eq!(
2612                    tags[2],
2613                    SendmeTag::from(hex!("2c01000000000000000000000000000000000000"))
2614                );
2615            }
2616
2617            let reply_with_sendme_fut = async move {
2618                // make and send a circuit-level sendme.
2619                let c_sendme =
2620                    relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000"))
2621                        .into();
2622                sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
2623
2624                // Make and send a stream-level sendme.
2625                let s_sendme = relaymsg::Sendme::new_empty().into();
2626                sink.send(rmsg_to_ccmsg(streamid, s_sendme)).await.unwrap();
2627
2628                sink
2629            };
2630
2631            let _sink = reply_with_sendme_fut.await;
2632
2633            rt.advance_until_stalled().await;
2634
2635            // Now make sure that the circuit is still happy, and its
2636            // window is updated.
2637            {
2638                let (tx, rx) = oneshot::channel();
2639                circ.command
2640                    .unbounded_send(CtrlCmd::QuerySendWindow {
2641                        hop: 2.into(),
2642                        leg: circ.unique_id(),
2643                        done: tx,
2644                    })
2645                    .unwrap();
2646                let (window, _tags) = rx.await.unwrap().unwrap();
2647                assert_eq!(window, 1000 - 201);
2648            }
2649        });
2650    }
2651
2652    #[traced_test]
2653    #[test]
2654    fn invalid_circ_sendme() {
2655        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2656            // Same setup as accept_valid_sendme() test above but try giving
2657            // a sendme with the wrong tag.
2658
2659            let (circ, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
2660                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2661
2662            let reply_with_sendme_fut = async move {
2663                // make and send a circuit-level sendme with a bad tag.
2664                let c_sendme =
2665                    relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF"))
2666                        .into();
2667                sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
2668                sink
2669            };
2670
2671            let _sink = reply_with_sendme_fut.await;
2672
2673            // Check whether the reactor dies as a result of receiving invalid data.
2674            rt.advance_until_stalled().await;
2675            assert!(circ.is_closing());
2676        });
2677    }
2678
2679    #[traced_test]
2680    #[test]
2681    fn test_busy_stream_fairness() {
2682        // Number of streams to use.
2683        const N_STREAMS: usize = 3;
2684        // Number of cells (roughly) for each stream to send.
2685        const N_CELLS: usize = 20;
2686        // Number of bytes that *each* stream will send, and that we'll read
2687        // from the channel.
2688        const N_BYTES: usize = relaymsg::Data::MAXLEN_V0 * N_CELLS;
2689        // Ignoring cell granularity, with perfect fairness we'd expect
2690        // `N_BYTES/N_STREAMS` bytes from each stream.
2691        //
2692        // We currently allow for up to a full cell less than that.  This is
2693        // somewhat arbitrary and can be changed as needed, since we don't
2694        // provide any specific fairness guarantees.
2695        const MIN_EXPECTED_BYTES_PER_STREAM: usize =
2696            N_BYTES / N_STREAMS - relaymsg::Data::MAXLEN_V0;
2697
2698        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2699            let (chan, mut rx, _sink) = working_fake_channel(&rt);
2700            let (circ, mut sink) = newcirc(&rt, chan).await;
2701
2702            // Run clients in a single task, doing our own round-robin
2703            // scheduling of writes to the reactor. Conversely, if we were to
2704            // put each client in its own task, we would be at the the mercy of
2705            // how fairly the runtime schedules the client tasks, which is outside
2706            // the scope of this test.
2707            rt.spawn({
2708                // Clone the circuit to keep it alive after writers have
2709                // finished with it.
2710                let circ = circ.clone();
2711                async move {
2712                    let mut clients = VecDeque::new();
2713                    struct Client {
2714                        stream: DataStream,
2715                        to_write: &'static [u8],
2716                    }
2717                    for _ in 0..N_STREAMS {
2718                        clients.push_back(Client {
2719                            stream: circ
2720                                .begin_stream("www.example.com", 80, None)
2721                                .await
2722                                .unwrap(),
2723                            to_write: &[0_u8; N_BYTES][..],
2724                        });
2725                    }
2726                    while let Some(mut client) = clients.pop_front() {
2727                        if client.to_write.is_empty() {
2728                            // Client is done. Don't put back in queue.
2729                            continue;
2730                        }
2731                        let written = client.stream.write(client.to_write).await.unwrap();
2732                        client.to_write = &client.to_write[written..];
2733                        clients.push_back(client);
2734                    }
2735                }
2736            })
2737            .unwrap();
2738
2739            let channel_handler_fut = async {
2740                let mut stream_bytes_received = HashMap::<StreamId, usize>::new();
2741                let mut total_bytes_received = 0;
2742
2743                loop {
2744                    let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2745                    let rmsg = match msg {
2746                        AnyChanMsg::Relay(r) => AnyRelayMsgOuter::decode_singleton(
2747                            RelayCellFormat::V0,
2748                            r.into_relay_body(),
2749                        )
2750                        .unwrap(),
2751                        other => panic!("Unexpected chanmsg: {other:?}"),
2752                    };
2753                    let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2754                    match rmsg.cmd() {
2755                        RelayCmd::BEGIN => {
2756                            // Add an entry for this stream.
2757                            let prev = stream_bytes_received.insert(streamid.unwrap(), 0);
2758                            assert_eq!(prev, None);
2759                            // Reply with a CONNECTED.
2760                            let connected = relaymsg::Connected::new_with_addr(
2761                                "10.0.0.1".parse().unwrap(),
2762                                1234,
2763                            )
2764                            .into();
2765                            sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2766                        }
2767                        RelayCmd::DATA => {
2768                            let data_msg = relaymsg::Data::try_from(rmsg).unwrap();
2769                            let nbytes = data_msg.as_ref().len();
2770                            total_bytes_received += nbytes;
2771                            let streamid = streamid.unwrap();
2772                            let stream_bytes = stream_bytes_received.get_mut(&streamid).unwrap();
2773                            *stream_bytes += nbytes;
2774                            if total_bytes_received >= N_BYTES {
2775                                break;
2776                            }
2777                        }
2778                        RelayCmd::END => {
2779                            // Stream is done. If fair scheduling is working as
2780                            // expected we *probably* shouldn't get here, but we
2781                            // can ignore it and save the failure until we
2782                            // actually have the final stats.
2783                            continue;
2784                        }
2785                        other => {
2786                            panic!("Unexpected command {other:?}");
2787                        }
2788                    }
2789                }
2790
2791                // Return our stats, along with the `rx` and `sink` to keep the
2792                // reactor alive (since clients could still be writing).
2793                (total_bytes_received, stream_bytes_received, rx, sink)
2794            };
2795
2796            let (total_bytes_received, stream_bytes_received, _rx, _sink) =
2797                channel_handler_fut.await;
2798            assert_eq!(stream_bytes_received.len(), N_STREAMS);
2799            for (sid, stream_bytes) in stream_bytes_received {
2800                assert!(
2801                    stream_bytes >= MIN_EXPECTED_BYTES_PER_STREAM,
2802                    "Only {stream_bytes} of {total_bytes_received} bytes received from {N_STREAMS} came from {sid:?}; expected at least {MIN_EXPECTED_BYTES_PER_STREAM}"
2803                );
2804            }
2805        });
2806    }
2807
2808    #[test]
2809    fn basic_params() {
2810        use super::CircParameters;
2811        let mut p = CircParameters::default();
2812        assert!(p.extend_by_ed25519_id);
2813
2814        p.extend_by_ed25519_id = false;
2815        assert!(!p.extend_by_ed25519_id);
2816    }
2817
2818    #[cfg(feature = "hs-service")]
2819    struct AllowAllStreamsFilter;
2820    #[cfg(feature = "hs-service")]
2821    impl IncomingStreamRequestFilter for AllowAllStreamsFilter {
2822        fn disposition(
2823            &mut self,
2824            _ctx: &crate::stream::IncomingStreamRequestContext<'_>,
2825            _circ: &crate::tunnel::reactor::syncview::ClientCircSyncView<'_>,
2826        ) -> Result<crate::stream::IncomingStreamRequestDisposition> {
2827            Ok(crate::stream::IncomingStreamRequestDisposition::Accept)
2828        }
2829    }
2830
2831    #[traced_test]
2832    #[test]
2833    #[cfg(feature = "hs-service")]
2834    fn allow_stream_requests_twice() {
2835        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2836            let (chan, _rx, _sink) = working_fake_channel(&rt);
2837            let (circ, _send) = newcirc(&rt, chan).await;
2838
2839            let _incoming = circ
2840                .allow_stream_requests(
2841                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2842                    circ.last_hop().unwrap(),
2843                    AllowAllStreamsFilter,
2844                )
2845                .await
2846                .unwrap();
2847
2848            let incoming = circ
2849                .allow_stream_requests(
2850                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2851                    circ.last_hop().unwrap(),
2852                    AllowAllStreamsFilter,
2853                )
2854                .await;
2855
2856            // There can only be one IncomingStream at a time on any given circuit.
2857            assert!(incoming.is_err());
2858        });
2859    }
2860
2861    #[traced_test]
2862    #[test]
2863    #[cfg(feature = "hs-service")]
2864    fn allow_stream_requests() {
2865        use tor_cell::relaycell::msg::BeginFlags;
2866
2867        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2868            const TEST_DATA: &[u8] = b"ping";
2869
2870            let (chan, _rx, _sink) = working_fake_channel(&rt);
2871            let (circ, mut send) = newcirc(&rt, chan).await;
2872
2873            let rfmt = RelayCellFormat::V0;
2874
2875            // A helper channel for coordinating the "client"/"service" interaction
2876            let (tx, rx) = oneshot::channel();
2877            let mut incoming = circ
2878                .allow_stream_requests(
2879                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2880                    circ.last_hop().unwrap(),
2881                    AllowAllStreamsFilter,
2882                )
2883                .await
2884                .unwrap();
2885
2886            let simulate_service = async move {
2887                let stream = incoming.next().await.unwrap();
2888                let mut data_stream = stream
2889                    .accept_data(relaymsg::Connected::new_empty())
2890                    .await
2891                    .unwrap();
2892                // Notify the client task we're ready to accept DATA cells
2893                tx.send(()).unwrap();
2894
2895                // Read the data the client sent us
2896                let mut buf = [0_u8; TEST_DATA.len()];
2897                data_stream.read_exact(&mut buf).await.unwrap();
2898                assert_eq!(&buf, TEST_DATA);
2899
2900                circ
2901            };
2902
2903            let simulate_client = async move {
2904                let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2905                let body: BoxedCellBody =
2906                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2907                        .encode(rfmt, &mut testing_rng())
2908                        .unwrap();
2909                let begin_msg = chanmsg::Relay::from(body);
2910
2911                // Pretend to be a client at the other end of the circuit sending a begin cell
2912                send.send(ClientCircChanMsg::Relay(begin_msg))
2913                    .await
2914                    .unwrap();
2915
2916                // Wait until the service is ready to accept data
2917                // TODO: we shouldn't need to wait! This is needed because the service will reject
2918                // any DATA cells that aren't associated with a known stream. We need to wait until
2919                // the service receives our BEGIN cell (and the reactor updates hop.map with the
2920                // new stream).
2921                rx.await.unwrap();
2922                // Now send some data along the newly established circuit..
2923                let data = relaymsg::Data::new(TEST_DATA).unwrap();
2924                let body: BoxedCellBody =
2925                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2926                        .encode(rfmt, &mut testing_rng())
2927                        .unwrap();
2928                let data_msg = chanmsg::Relay::from(body);
2929
2930                send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
2931                send
2932            };
2933
2934            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2935        });
2936    }
2937
2938    #[traced_test]
2939    #[test]
2940    #[cfg(feature = "hs-service")]
2941    fn accept_stream_after_reject() {
2942        use tor_cell::relaycell::msg::BeginFlags;
2943        use tor_cell::relaycell::msg::EndReason;
2944
2945        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2946            const TEST_DATA: &[u8] = b"ping";
2947            const STREAM_COUNT: usize = 2;
2948            let rfmt = RelayCellFormat::V0;
2949
2950            let (chan, _rx, _sink) = working_fake_channel(&rt);
2951            let (circ, mut send) = newcirc(&rt, chan).await;
2952
2953            // A helper channel for coordinating the "client"/"service" interaction
2954            let (mut tx, mut rx) = mpsc::channel(STREAM_COUNT);
2955
2956            let mut incoming = circ
2957                .allow_stream_requests(
2958                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2959                    circ.last_hop().unwrap(),
2960                    AllowAllStreamsFilter,
2961                )
2962                .await
2963                .unwrap();
2964
2965            let simulate_service = async move {
2966                // Process 2 incoming streams
2967                for i in 0..STREAM_COUNT {
2968                    let stream = incoming.next().await.unwrap();
2969
2970                    // Reject the first one
2971                    if i == 0 {
2972                        stream
2973                            .reject(relaymsg::End::new_with_reason(EndReason::INTERNAL))
2974                            .await
2975                            .unwrap();
2976                        // Notify the client
2977                        tx.send(()).await.unwrap();
2978                        continue;
2979                    }
2980
2981                    let mut data_stream = stream
2982                        .accept_data(relaymsg::Connected::new_empty())
2983                        .await
2984                        .unwrap();
2985                    // Notify the client task we're ready to accept DATA cells
2986                    tx.send(()).await.unwrap();
2987
2988                    // Read the data the client sent us
2989                    let mut buf = [0_u8; TEST_DATA.len()];
2990                    data_stream.read_exact(&mut buf).await.unwrap();
2991                    assert_eq!(&buf, TEST_DATA);
2992                }
2993
2994                circ
2995            };
2996
2997            let simulate_client = async move {
2998                let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2999                let body: BoxedCellBody =
3000                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
3001                        .encode(rfmt, &mut testing_rng())
3002                        .unwrap();
3003                let begin_msg = chanmsg::Relay::from(body);
3004
3005                // Pretend to be a client at the other end of the circuit sending 2 identical begin
3006                // cells (the first one will be rejected by the test service).
3007                for _ in 0..STREAM_COUNT {
3008                    send.send(ClientCircChanMsg::Relay(begin_msg.clone()))
3009                        .await
3010                        .unwrap();
3011
3012                    // Wait until the service rejects our request
3013                    rx.next().await.unwrap();
3014                }
3015
3016                // Now send some data along the newly established circuit..
3017                let data = relaymsg::Data::new(TEST_DATA).unwrap();
3018                let body: BoxedCellBody =
3019                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
3020                        .encode(rfmt, &mut testing_rng())
3021                        .unwrap();
3022                let data_msg = chanmsg::Relay::from(body);
3023
3024                send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
3025                send
3026            };
3027
3028            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
3029        });
3030    }
3031
3032    #[traced_test]
3033    #[test]
3034    #[cfg(feature = "hs-service")]
3035    fn incoming_stream_bad_hop() {
3036        use tor_cell::relaycell::msg::BeginFlags;
3037
3038        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
3039            /// Expect the originator of the BEGIN cell to be hop 1.
3040            const EXPECTED_HOP: u8 = 1;
3041            let rfmt = RelayCellFormat::V0;
3042
3043            let (chan, _rx, _sink) = working_fake_channel(&rt);
3044            let (circ, mut send) = newcirc(&rt, chan).await;
3045
3046            // Expect to receive incoming streams from hop EXPECTED_HOP
3047            let mut incoming = circ
3048                .allow_stream_requests(
3049                    &[tor_cell::relaycell::RelayCmd::BEGIN],
3050                    (circ.unique_id(), EXPECTED_HOP.into()).into(),
3051                    AllowAllStreamsFilter,
3052                )
3053                .await
3054                .unwrap();
3055
3056            let simulate_service = async move {
3057                // The originator of the cell is actually the last hop on the circuit, not hop 1,
3058                // so we expect the reactor to shut down.
3059                assert!(incoming.next().await.is_none());
3060                circ
3061            };
3062
3063            let simulate_client = async move {
3064                let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
3065                let body: BoxedCellBody =
3066                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
3067                        .encode(rfmt, &mut testing_rng())
3068                        .unwrap();
3069                let begin_msg = chanmsg::Relay::from(body);
3070
3071                // Pretend to be a client at the other end of the circuit sending a begin cell
3072                send.send(ClientCircChanMsg::Relay(begin_msg))
3073                    .await
3074                    .unwrap();
3075
3076                send
3077            };
3078
3079            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
3080        });
3081    }
3082
3083    #[traced_test]
3084    #[test]
3085    #[cfg(feature = "conflux")]
3086    fn multipath_circ_validation() {
3087        use std::error::Error as _;
3088
3089        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3090            let params = CircParameters::default();
3091            let invalid_tunnels = [
3092                setup_bad_conflux_tunnel(&rt).await,
3093                setup_conflux_tunnel(&rt, true, params).await,
3094            ];
3095
3096            for tunnel in invalid_tunnels {
3097                let TestTunnelCtx {
3098                    tunnel: _tunnel,
3099                    circs: _circs,
3100                    conflux_link_rx,
3101                } = tunnel;
3102
3103                let conflux_hs_err = conflux_link_rx.await.unwrap().unwrap_err();
3104                let err_src = conflux_hs_err.source().unwrap();
3105
3106                // The two circuits don't end in the same hop (no join point),
3107                // so the reactor will refuse to link them
3108                assert!(err_src
3109                    .to_string()
3110                    .contains("one more more conflux circuits are invalid"));
3111            }
3112        });
3113    }
3114
3115    // TODO: this structure could be reused for the other tests,
3116    // to address nickm's comment:
3117    // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3005#note_3202362
3118    #[derive(Debug)]
3119    #[allow(unused)]
3120    #[cfg(feature = "conflux")]
3121    struct TestCircuitCtx {
3122        chan_rx: Receiver<AnyChanCell>,
3123        chan_tx: Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
3124        circ_tx: CircuitRxSender,
3125        unique_id: UniqId,
3126    }
3127
3128    #[derive(Debug)]
3129    #[cfg(feature = "conflux")]
3130    struct TestTunnelCtx {
3131        tunnel: Arc<ClientCirc>,
3132        circs: Vec<TestCircuitCtx>,
3133        conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3134    }
3135
3136    /// Wait for a LINK cell to arrive on the specified channel and return its payload.
3137    #[cfg(feature = "conflux")]
3138    async fn await_link_payload(rx: &mut Receiver<AnyChanCell>) -> ConfluxLink {
3139        // Wait for the LINK cell...
3140        let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
3141        let rmsg = match chmsg {
3142            AnyChanMsg::Relay(r) => {
3143                AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3144                    .unwrap()
3145            }
3146            other => panic!("{:?}", other),
3147        };
3148        let (streamid, rmsg) = rmsg.into_streamid_and_msg();
3149
3150        let link = match rmsg {
3151            AnyRelayMsg::ConfluxLink(link) => link,
3152            _ => panic!("unexpected relay message {rmsg:?}"),
3153        };
3154
3155        assert!(streamid.is_none());
3156
3157        link
3158    }
3159
3160    #[cfg(feature = "conflux")]
3161    async fn setup_conflux_tunnel(
3162        rt: &MockRuntime,
3163        same_hops: bool,
3164        params: CircParameters,
3165    ) -> TestTunnelCtx {
3166        let hops1 = hop_details(3, 0);
3167        let hops2 = if same_hops {
3168            hops1.clone()
3169        } else {
3170            hop_details(3, 10)
3171        };
3172
3173        let (chan1, rx1, chan_sink1) = working_fake_channel(rt);
3174        let (circ1, sink1) = newcirc_ext(
3175            rt,
3176            UniqId::new(1, 3),
3177            chan1,
3178            hops1,
3179            2.into(),
3180            params.clone(),
3181        )
3182        .await;
3183
3184        let (chan2, rx2, chan_sink2) = working_fake_channel(rt);
3185
3186        let (circ2, sink2) =
3187            newcirc_ext(rt, UniqId::new(2, 4), chan2, hops2, 2.into(), params).await;
3188
3189        let (answer_tx, answer_rx) = oneshot::channel();
3190        circ2
3191            .command
3192            .unbounded_send(CtrlCmd::ShutdownAndReturnCircuit { answer: answer_tx })
3193            .unwrap();
3194
3195        let circuit = answer_rx.await.unwrap().unwrap();
3196        // The circuit should be shutting down its reactor
3197        rt.advance_until_stalled().await;
3198        assert!(circ2.is_closing());
3199
3200        let (conflux_link_tx, conflux_link_rx) = oneshot::channel();
3201        // Tell the first circuit to link with the second and form a multipath tunnel
3202        circ1
3203            .control
3204            .unbounded_send(CtrlMsg::LinkCircuits {
3205                circuits: vec![circuit],
3206                answer: conflux_link_tx,
3207            })
3208            .unwrap();
3209
3210        let circ_ctx1 = TestCircuitCtx {
3211            chan_rx: rx1,
3212            chan_tx: chan_sink1,
3213            circ_tx: sink1,
3214            unique_id: circ1.unique_id(),
3215        };
3216
3217        let circ_ctx2 = TestCircuitCtx {
3218            chan_rx: rx2,
3219            chan_tx: chan_sink2,
3220            circ_tx: sink2,
3221            unique_id: circ2.unique_id(),
3222        };
3223
3224        TestTunnelCtx {
3225            tunnel: circ1,
3226            circs: vec![circ_ctx1, circ_ctx2],
3227            conflux_link_rx,
3228        }
3229    }
3230
3231    #[cfg(feature = "conflux")]
3232    async fn setup_good_conflux_tunnel(rt: &MockRuntime) -> TestTunnelCtx {
3233        // Our 2 test circuits are identical, so they both have the same guards,
3234        // which technically violates the conflux set rule mentioned in prop354.
3235        // For testing purposes this is fine, but in production we'll need to ensure
3236        // the calling code prevents guard reuse (except in the case where
3237        // one of the guards happens to be Guard + Exit)
3238        let same_hops = true;
3239        let params = CircParameters::new(true, build_cc_vegas_params());
3240        setup_conflux_tunnel(rt, same_hops, params).await
3241    }
3242
3243    #[cfg(feature = "conflux")]
3244    async fn setup_bad_conflux_tunnel(rt: &MockRuntime) -> TestTunnelCtx {
3245        // The two circuits don't share any hops,
3246        // so they won't end in the same hop (no join point),
3247        // causing the reactor to refuse to link them.
3248        let same_hops = false;
3249        let params = CircParameters::new(true, build_cc_vegas_params());
3250        setup_conflux_tunnel(rt, same_hops, params).await
3251    }
3252
3253    #[traced_test]
3254    #[test]
3255    #[cfg(feature = "conflux")]
3256    fn reject_conflux_linked_before_hs() {
3257        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3258            let (chan, mut _rx, _sink) = working_fake_channel(&rt);
3259            let (circ, mut sink) = newcirc(&rt, chan).await;
3260
3261            let nonce = V1Nonce::new(&mut testing_rng());
3262            let payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
3263            // Send a LINKED cell
3264            let linked = relaymsg::ConfluxLinked::new(payload).into();
3265            sink.send(rmsg_to_ccmsg(None, linked)).await.unwrap();
3266
3267            rt.advance_until_stalled().await;
3268            assert!(circ.is_closing());
3269        });
3270    }
3271
3272    #[traced_test]
3273    #[test]
3274    #[cfg(feature = "conflux")]
3275    fn conflux_hs_timeout() {
3276        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3277            let TestTunnelCtx {
3278                tunnel: _tunnel,
3279                circs,
3280                conflux_link_rx,
3281            } = setup_good_conflux_tunnel(&rt).await;
3282
3283            let [mut circ1, _circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3284
3285            // Wait for the LINK cell
3286            let link = await_link_payload(&mut circ1.chan_rx).await;
3287
3288            // Send a LINK cell on the first leg...
3289            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3290            circ1
3291                .circ_tx
3292                .send(rmsg_to_ccmsg(None, linked))
3293                .await
3294                .unwrap();
3295
3296            // Do nothing, and wait for the handshake to timeout on the second leg
3297            rt.advance_by(Duration::from_secs(60)).await;
3298
3299            let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3300
3301            // Get the handshake results of each circuit
3302            let [res1, res2]: [StdResult<(), ConfluxHandshakeError>; 2] =
3303                conflux_hs_res.try_into().unwrap();
3304
3305            assert!(res1.is_ok());
3306
3307            let err = res2.unwrap_err();
3308            assert!(matches!(err, ConfluxHandshakeError::Timeout), "{err:?}");
3309        });
3310    }
3311
3312    #[traced_test]
3313    #[test]
3314    #[cfg(feature = "conflux")]
3315    fn conflux_bad_hs() {
3316        use crate::util::err::ConfluxHandshakeError;
3317
3318        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3319            let nonce = V1Nonce::new(&mut testing_rng());
3320            let bad_link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
3321            //let extended2 = relaymsg::Extended2::new(vec![]).into();
3322            let bad_hs_responses = [
3323                (
3324                    rmsg_to_ccmsg(
3325                        None,
3326                        relaymsg::ConfluxLinked::new(bad_link_payload.clone()).into(),
3327                    ),
3328                    "Received CONFLUX_LINKED cell with mismatched nonce",
3329                ),
3330                (
3331                    rmsg_to_ccmsg(None, relaymsg::ConfluxLink::new(bad_link_payload).into()),
3332                    "Unexpected CONFLUX_LINK cell from hop #3 on client circuit",
3333                ),
3334                (
3335                    rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into()),
3336                    "Received CONFLUX_SWITCH on unlinked circuit?!",
3337                ),
3338                // TODO: this currently causes the reactor to shut down immediately,
3339                // without sending a response on the handshake channel
3340                /*
3341                (
3342                    rmsg_to_ccmsg(None, extended2),
3343                    "Received CONFLUX_LINKED cell with mismatched nonce",
3344                ),
3345                */
3346            ];
3347
3348            for (bad_cell, expected_err) in bad_hs_responses {
3349                let TestTunnelCtx {
3350                    tunnel,
3351                    circs,
3352                    conflux_link_rx,
3353                } = setup_good_conflux_tunnel(&rt).await;
3354
3355                let [mut _circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3356
3357                // Respond with a bogus cell on one of the legs
3358                circ2.circ_tx.send(bad_cell).await.unwrap();
3359
3360                let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3361                // Get the handshake results (the handshake results are reported early,
3362                // without waiting for the second circuit leg's handshake to timeout,
3363                // because this is a protocol violation causing the entire tunnel to shut down)
3364                let [res2]: [StdResult<(), ConfluxHandshakeError>; 1] =
3365                    conflux_hs_res.try_into().unwrap();
3366
3367                match res2.unwrap_err() {
3368                    ConfluxHandshakeError::Link(Error::CircProto(e)) => {
3369                        assert_eq!(e, expected_err);
3370                    }
3371                    e => panic!("unexpected error: {e:?}"),
3372                }
3373
3374                assert!(tunnel.is_closing());
3375            }
3376        });
3377    }
3378
3379    #[traced_test]
3380    #[test]
3381    #[cfg(feature = "conflux")]
3382    fn unexpected_conflux_cell() {
3383        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3384            let nonce = V1Nonce::new(&mut testing_rng());
3385            let link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
3386            let bad_cells = [
3387                rmsg_to_ccmsg(
3388                    None,
3389                    relaymsg::ConfluxLinked::new(link_payload.clone()).into(),
3390                ),
3391                rmsg_to_ccmsg(
3392                    None,
3393                    relaymsg::ConfluxLink::new(link_payload.clone()).into(),
3394                ),
3395                rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into()),
3396            ];
3397
3398            for bad_cell in bad_cells {
3399                let (chan, mut _rx, _sink) = working_fake_channel(&rt);
3400                let (circ, mut sink) = newcirc(&rt, chan).await;
3401
3402                sink.send(bad_cell).await.unwrap();
3403                rt.advance_until_stalled().await;
3404
3405                // Note: unfortunately we can't assert the circuit is
3406                // closing for the reason, because the reactor just logs
3407                // the error and then exits.
3408                assert!(circ.is_closing());
3409            }
3410        });
3411    }
3412
3413    #[traced_test]
3414    #[test]
3415    #[cfg(feature = "conflux")]
3416    fn conflux_bad_linked() {
3417        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3418            let TestTunnelCtx {
3419                tunnel,
3420                circs,
3421                conflux_link_rx: _,
3422            } = setup_good_conflux_tunnel(&rt).await;
3423
3424            let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3425
3426            let link = await_link_payload(&mut circ1.chan_rx).await;
3427
3428            // Send a LINK cell on the first leg...
3429            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3430            circ1
3431                .circ_tx
3432                .send(rmsg_to_ccmsg(None, linked))
3433                .await
3434                .unwrap();
3435
3436            // ...and two LINKED cells on the second
3437            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3438            circ2
3439                .circ_tx
3440                .send(rmsg_to_ccmsg(None, linked))
3441                .await
3442                .unwrap();
3443            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3444            circ2
3445                .circ_tx
3446                .send(rmsg_to_ccmsg(None, linked))
3447                .await
3448                .unwrap();
3449
3450            rt.advance_until_stalled().await;
3451
3452            // Receiving a LINKED cell on an already linked leg causes
3453            // the tunnel to be torn down
3454            assert!(tunnel.is_closing());
3455        });
3456    }
3457
3458    #[traced_test]
3459    #[test]
3460    #[cfg(feature = "conflux")]
3461    fn conflux_bad_switch() {
3462        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3463            let bad_switch = [
3464                // SWITCH cells with seqno = 0 are not allowed
3465                relaymsg::ConfluxSwitch::new(0),
3466                // TODO(#2031): from c-tor:
3467                //
3468                // We have to make sure that the switch command is truly
3469                // incrementing the sequence number, or else it becomes
3470                // a side channel that can be spammed for traffic analysis.
3471                //
3472                // We should figure out what this check is supposed to look like,
3473                // and have a test for it
3474            ];
3475
3476            for bad_cell in bad_switch {
3477                let TestTunnelCtx {
3478                    tunnel,
3479                    circs,
3480                    conflux_link_rx,
3481                } = setup_good_conflux_tunnel(&rt).await;
3482
3483                let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3484
3485                let link = await_link_payload(&mut circ1.chan_rx).await;
3486
3487                // Send a LINKED cell on both legs
3488                for circ in [&mut circ1, &mut circ2] {
3489                    let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3490                    circ.circ_tx
3491                        .send(rmsg_to_ccmsg(None, linked))
3492                        .await
3493                        .unwrap();
3494                }
3495
3496                let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3497                assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
3498
3499                // Now send a bad SWITCH cell on *both* legs.
3500                // This will cause both legs to be removed from the conflux set,
3501                // which causes the tunnel reactor to shut down
3502                for circ in [&mut circ1, &mut circ2] {
3503                    let msg = rmsg_to_ccmsg(None, bad_cell.clone().into());
3504                    circ.circ_tx.send(msg).await.unwrap();
3505                }
3506
3507                // The tunnel should be shutting down
3508                rt.advance_until_stalled().await;
3509                assert!(tunnel.is_closing());
3510            }
3511        });
3512    }
3513
3514    // TODO(conflux): add a test for SWITCH handling
3515
3516    /// Run a conflux test endpoint.
3517    #[cfg(feature = "conflux")]
3518    #[derive(Debug)]
3519    enum ConfluxTestEndpoint<I: Iterator<Item = Option<Duration>>> {
3520        /// Pretend to be an exit relay.
3521        Relay(ConfluxExitState<I>),
3522        /// Client task.
3523        Client {
3524            /// Channel for receiving the outcome of the conflux handshakes.
3525            conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3526            /// The tunnel reactor handle
3527            tunnel: Arc<ClientCirc>,
3528            /// Data to send on a stream.
3529            send_data: Vec<u8>,
3530            /// Data we expect to receive on a stream.
3531            recv_data: Vec<u8>,
3532        },
3533    }
3534
3535    /// Structure for returning the sinks, channels, etc. that must stay
3536    /// alive until the test is complete.
3537    #[allow(unused, clippy::large_enum_variant)]
3538    #[derive(Debug)]
3539    #[cfg(feature = "conflux")]
3540    enum ConfluxEndpointResult {
3541        Circuit {
3542            tunnel: Arc<ClientCirc>,
3543            stream: DataStream,
3544        },
3545        Relay {
3546            circ: TestCircuitCtx,
3547        },
3548    }
3549
3550    /// Stream data, shared by all the mock exit endpoints.
3551    #[derive(Debug)]
3552    #[cfg(feature = "conflux")]
3553    struct ConfluxStreamState {
3554        /// The data received so far on this stream (at the exit).
3555        data_recvd: Vec<u8>,
3556        /// The total amount of data we expect to receive on this stream.
3557        expected_data_len: usize,
3558        /// Whether we have seen a BEGIN cell yet.
3559        begin_recvd: bool,
3560        /// Whether we have seen an END cell yet.
3561        end_recvd: bool,
3562        /// Whether we have sent an END cell yet.
3563        end_sent: bool,
3564    }
3565
3566    #[cfg(feature = "conflux")]
3567    impl ConfluxStreamState {
3568        fn new(expected_data_len: usize) -> Self {
3569            Self {
3570                data_recvd: vec![],
3571                expected_data_len,
3572                begin_recvd: false,
3573                end_recvd: false,
3574                end_sent: false,
3575            }
3576        }
3577    }
3578
3579    /// An object describing a SWITCH cell that we expect to receive
3580    /// in the mock exit
3581    #[derive(Debug)]
3582    #[cfg(feature = "conflux")]
3583    struct ExpectedSwitch {
3584        /// The number of cells we've seen on this leg so far,
3585        /// up to and including the SWITCH.
3586        cells_so_far: usize,
3587        /// The expected seqno in SWITCH cell,
3588        seqno: u32,
3589    }
3590
3591    /// Object dispatching cells for delivery on the appropriate
3592    /// leg in a multipath tunnel.
3593    ///
3594    /// Used to send out-of-order cells from the mock exit
3595    /// to the client under test.
3596    #[cfg(feature = "conflux")]
3597    struct CellDispatcher {
3598        /// Channels on which to send the [`CellToSend`] commands on.
3599        leg_tx: HashMap<UniqId, mpsc::Sender<CellToSend>>,
3600        /// The list of cells to send,
3601        cells_to_send: Vec<(UniqId, AnyRelayMsg)>,
3602    }
3603
3604    #[cfg(feature = "conflux")]
3605    impl CellDispatcher {
3606        async fn run(mut self) {
3607            while !self.cells_to_send.is_empty() {
3608                let (circ_id, cell) = self.cells_to_send.remove(0);
3609                let cell_tx = self.leg_tx.get_mut(&circ_id).unwrap();
3610                let (done_tx, done_rx) = oneshot::channel();
3611                cell_tx.send(CellToSend { done_tx, cell }).await.unwrap();
3612                // Wait for the cell to be sent before sending the next one.
3613                let () = done_rx.await.unwrap();
3614            }
3615        }
3616    }
3617
3618    /// A cell for the mock exit to send on one of its legs.
3619    #[cfg(feature = "conflux")]
3620    #[derive(Debug)]
3621    struct CellToSend {
3622        /// Channel for notifying the control task that the cell was sent.
3623        done_tx: oneshot::Sender<()>,
3624        /// The cell to send.
3625        cell: AnyRelayMsg,
3626    }
3627
3628    /// The state of a mock exit.
3629    #[derive(Debug)]
3630    #[cfg(feature = "conflux")]
3631    struct ConfluxExitState<I: Iterator<Item = Option<Duration>>> {
3632        /// The runtime, shared by the test client and mock exit tasks.
3633        ///
3634        /// The mutex prevents the client and mock exit tasks from calling
3635        /// functions like [`MockRuntime::advance_until_stalled`]
3636        /// or [`MockRuntime::progress_until_stalled]` concurrently,
3637        /// as this is not supported by the mock runtime.
3638        runtime: Arc<AsyncMutex<MockRuntime>>,
3639        /// The client view of the tunnel.
3640        tunnel: Arc<ClientCirc>,
3641        /// The circuit test context.
3642        circ: TestCircuitCtx,
3643        /// The RTT delay to introduce just before each SENDME.
3644        ///
3645        /// Used to trigger the client to send a SWITCH.
3646        rtt_delays: I,
3647        /// State of the (only) expected stream on this tunnel,
3648        /// shared by all the mock exit endpoints.
3649        stream_state: Arc<Mutex<ConfluxStreamState>>,
3650        /// The number of cells after which to expect a SWITCH
3651        /// cell from the client.
3652        expect_switch: Vec<ExpectedSwitch>,
3653        /// Channel for receiving notifications from the other leg.
3654        event_rx: mpsc::Receiver<MockExitEvent>,
3655        /// Channel for sending notifications to the other leg.
3656        event_tx: mpsc::Sender<MockExitEvent>,
3657        /// Whether this circuit leg should act as the primary (sending) leg.
3658        is_sending_leg: bool,
3659        /// A channel for receiving cells to send on this stream.
3660        cells_rx: mpsc::Receiver<CellToSend>,
3661    }
3662
3663    #[cfg(feature = "conflux")]
3664    async fn good_exit_handshake(
3665        runtime: &Arc<AsyncMutex<MockRuntime>>,
3666        init_rtt_delay: Option<Duration>,
3667        rx: &mut Receiver<ChanCell<AnyChanMsg>>,
3668        sink: &mut CircuitRxSender,
3669    ) {
3670        // Wait for the LINK cell
3671        let link = await_link_payload(rx).await;
3672
3673        // Introduce an artificial delay, to make one circ have a better initial RTT
3674        // than the other
3675        if let Some(init_rtt_delay) = init_rtt_delay {
3676            runtime.lock().await.advance_by(init_rtt_delay).await;
3677        }
3678
3679        // Reply with a LINKED cell...
3680        let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3681        sink.send(rmsg_to_ccmsg(None, linked)).await.unwrap();
3682
3683        // Wait for the client to respond with LINKED_ACK...
3684        let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
3685        let rmsg = match chmsg {
3686            AnyChanMsg::Relay(r) => {
3687                AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3688                    .unwrap()
3689            }
3690            other => panic!("{other:?}"),
3691        };
3692        let (_streamid, rmsg) = rmsg.into_streamid_and_msg();
3693
3694        assert!(matches!(rmsg, AnyRelayMsg::ConfluxLinkedAck(_)));
3695    }
3696
3697    /// An event sent by one mock conflux leg to another.
3698    #[derive(Copy, Clone, Debug)]
3699    enum MockExitEvent {
3700        /// Inform the other leg we are done.
3701        Done,
3702        /// Inform the other leg a stream was opened.
3703        BeginRecvd(StreamId),
3704    }
3705
3706    #[cfg(feature = "conflux")]
3707    async fn run_mock_conflux_exit<I: Iterator<Item = Option<Duration>>>(
3708        state: ConfluxExitState<I>,
3709    ) -> ConfluxEndpointResult {
3710        let ConfluxExitState {
3711            runtime,
3712            tunnel,
3713            mut circ,
3714            rtt_delays,
3715            stream_state,
3716            mut expect_switch,
3717            mut event_tx,
3718            mut event_rx,
3719            is_sending_leg,
3720            mut cells_rx,
3721        } = state;
3722
3723        let mut rtt_delays = rtt_delays.into_iter();
3724
3725        // Expect the client to open a stream, and de-multiplex the received stream data
3726        let stream_len = stream_state.lock().unwrap().expected_data_len;
3727        let mut data_cells_received = 0_usize;
3728        let mut cell_count = 0_usize;
3729        let mut tags = vec![];
3730        let mut streamid = None;
3731        let mut done_writing = false;
3732
3733        loop {
3734            let should_exit = {
3735                let stream_state = stream_state.lock().unwrap();
3736                let done_reading = stream_state.data_recvd.len() >= stream_len;
3737
3738                (stream_state.begin_recvd || stream_state.end_recvd) && done_reading && done_writing
3739            };
3740
3741            if should_exit {
3742                break;
3743            }
3744
3745            use futures::select;
3746
3747            // Only start reading from the dispatcher channel after the stream is open
3748            // and we're ready to start sending cells.
3749            let mut next_cell = if streamid.is_some() && !done_writing {
3750                Box::pin(cells_rx.next().fuse())
3751                    as Pin<Box<dyn FusedFuture<Output = Option<CellToSend>> + Send>>
3752            } else {
3753                Box::pin(std::future::pending().fuse())
3754            };
3755
3756            // Wait for the BEGIN cell to arrive, or for the transfer to complete
3757            // (we need to bail if the other leg already completed);
3758            let res = select! {
3759                res = circ.chan_rx.next() => {
3760                    res.unwrap()
3761                },
3762                res = event_rx.next() => {
3763                    let Some(event) = res else {
3764                        break;
3765                    };
3766
3767                    match event {
3768                        MockExitEvent::Done => {
3769                            break;
3770                        },
3771                        MockExitEvent::BeginRecvd(id) => {
3772                            // The stream is now open (the other leg received the BEGIN),
3773                            // so we're reading to start reading cells from the cell dispatcher.
3774                            streamid = Some(id);
3775                            continue;
3776                        },
3777                    }
3778                }
3779                res = next_cell => {
3780                    if let Some(cell_to_send) = res {
3781                        let CellToSend { cell, done_tx } = cell_to_send;
3782
3783                        // SWITCH cells don't have a stream ID
3784                        let streamid = if matches!(cell, AnyRelayMsg::ConfluxSwitch(_)) {
3785                            None
3786                        } else {
3787                            streamid
3788                        };
3789
3790                        circ.circ_tx
3791                            .send(rmsg_to_ccmsg(streamid, cell))
3792                            .await
3793                            .unwrap();
3794
3795                        runtime.lock().await.advance_until_stalled().await;
3796                        done_tx.send(()).unwrap();
3797                    } else {
3798                        done_writing = true;
3799                    }
3800
3801                    continue;
3802                }
3803            };
3804
3805            let (_id, chmsg) = res.into_circid_and_msg();
3806            cell_count += 1;
3807            let rmsg = match chmsg {
3808                AnyChanMsg::Relay(r) => {
3809                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3810                        .unwrap()
3811                }
3812                other => panic!("{:?}", other),
3813            };
3814            let (new_streamid, rmsg) = rmsg.into_streamid_and_msg();
3815            if streamid.is_none() {
3816                streamid = new_streamid;
3817            }
3818
3819            let begin_recvd = stream_state.lock().unwrap().begin_recvd;
3820            let end_recvd = stream_state.lock().unwrap().end_recvd;
3821            match rmsg {
3822                AnyRelayMsg::Begin(_) if begin_recvd => {
3823                    panic!("client tried to open two streams?!");
3824                }
3825                AnyRelayMsg::Begin(_) if !begin_recvd => {
3826                    stream_state.lock().unwrap().begin_recvd = true;
3827                    // Reply with a connected cell...
3828                    let connected = relaymsg::Connected::new_empty().into();
3829                    circ.circ_tx
3830                        .send(rmsg_to_ccmsg(streamid, connected))
3831                        .await
3832                        .unwrap();
3833                    // Tell the other leg we received a BEGIN cell
3834                    event_tx
3835                        .send(MockExitEvent::BeginRecvd(streamid.unwrap()))
3836                        .await
3837                        .unwrap();
3838                }
3839                AnyRelayMsg::End(_) if !end_recvd => {
3840                    stream_state.lock().unwrap().end_recvd = true;
3841                    break;
3842                }
3843                AnyRelayMsg::End(_) if end_recvd => {
3844                    panic!("received two END cells for the same stream?!");
3845                }
3846                AnyRelayMsg::ConfluxSwitch(cell) => {
3847                    // Ensure we got the SWITCH after the expected number of cells
3848                    let expected = expect_switch.remove(0);
3849
3850                    assert_eq!(expected.cells_so_far, cell_count);
3851                    assert_eq!(expected.seqno, cell.seqno());
3852
3853                    // To keep the tests simple, we don't handle out of order cells,
3854                    // and simply sort the received data at the end.
3855                    // This ensures all the data was actually received,
3856                    // but it doesn't actually test that the SWITCH cells
3857                    // contain the appropriate seqnos.
3858                    continue;
3859                }
3860                AnyRelayMsg::Data(dat) => {
3861                    data_cells_received += 1;
3862                    stream_state
3863                        .lock()
3864                        .unwrap()
3865                        .data_recvd
3866                        .extend_from_slice(dat.as_ref());
3867
3868                    let is_next_cell_sendme = data_cells_received % 31 == 0;
3869                    if is_next_cell_sendme {
3870                        if tags.is_empty() {
3871                            // Important: we need to make sure all the SENDMEs
3872                            // we sent so far have been processed by the reactor
3873                            // (otherwise the next QuerySendWindow call
3874                            // might return an outdated list of tags!)
3875                            runtime.lock().await.advance_until_stalled().await;
3876                            let (tx, rx) = oneshot::channel();
3877                            tunnel
3878                                .command
3879                                .unbounded_send(CtrlCmd::QuerySendWindow {
3880                                    hop: 2.into(),
3881                                    leg: circ.unique_id,
3882                                    done: tx,
3883                                })
3884                                .unwrap();
3885
3886                            // Get a fresh batch of tags.
3887                            let (_window, new_tags) = rx.await.unwrap().unwrap();
3888                            tags = new_tags;
3889                        }
3890
3891                        let tag = tags.remove(0);
3892
3893                        // Introduce an artificial delay, to make one circ have worse RTT
3894                        // than the other, and thus trigger a SWITCH
3895                        if let Some(rtt_delay) = rtt_delays.next().flatten() {
3896                            runtime.lock().await.advance_by(rtt_delay).await;
3897                        }
3898                        // Make and send a circuit-level SENDME
3899                        let sendme = relaymsg::Sendme::from(tag).into();
3900
3901                        circ.circ_tx
3902                            .send(rmsg_to_ccmsg(None, sendme))
3903                            .await
3904                            .unwrap();
3905                    }
3906                }
3907                _ => panic!("unexpected message {rmsg:?} on leg {}", circ.unique_id),
3908            }
3909        }
3910
3911        let end_recvd = stream_state.lock().unwrap().end_recvd;
3912
3913        // Close the stream if the other endpoint hasn't already done so
3914        if is_sending_leg && !end_recvd {
3915            let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
3916            circ.circ_tx
3917                .send(rmsg_to_ccmsg(streamid, end))
3918                .await
3919                .unwrap();
3920            stream_state.lock().unwrap().end_sent = true;
3921        }
3922
3923        // This is allowed to fail, because the other leg might have exited first.
3924        let _ = event_tx.send(MockExitEvent::Done).await;
3925
3926        // Ensure we received all the switch cells we were expecting
3927        assert!(
3928            expect_switch.is_empty(),
3929            "expect_switch = {expect_switch:?}"
3930        );
3931
3932        ConfluxEndpointResult::Relay { circ }
3933    }
3934
3935    #[cfg(feature = "conflux")]
3936    async fn run_conflux_client(
3937        tunnel: Arc<ClientCirc>,
3938        conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3939        send_data: Vec<u8>,
3940        recv_data: Vec<u8>,
3941    ) -> ConfluxEndpointResult {
3942        let res = conflux_link_rx.await;
3943
3944        let res = res.unwrap().unwrap();
3945        assert_eq!(res.len(), 2);
3946
3947        // All circuit legs have completed the conflux handshake,
3948        // so we now have a multipath tunnel
3949
3950        // Now we're ready to open a stream
3951        let mut stream = tunnel
3952            .begin_stream("www.example.com", 443, None)
3953            .await
3954            .unwrap();
3955
3956        stream.write_all(&send_data).await.unwrap();
3957        stream.flush().await.unwrap();
3958
3959        let mut recv: Vec<u8> = Vec::new();
3960        let recv_len = stream.read_to_end(&mut recv).await.unwrap();
3961        assert_eq!(recv_len, recv_data.len());
3962        assert_eq!(recv_data, recv);
3963
3964        ConfluxEndpointResult::Circuit { tunnel, stream }
3965    }
3966
3967    #[cfg(feature = "conflux")]
3968    async fn run_conflux_endpoint<I: Iterator<Item = Option<Duration>>>(
3969        endpoint: ConfluxTestEndpoint<I>,
3970    ) -> ConfluxEndpointResult {
3971        match endpoint {
3972            ConfluxTestEndpoint::Relay(state) => run_mock_conflux_exit(state).await,
3973            ConfluxTestEndpoint::Client {
3974                tunnel,
3975                conflux_link_rx,
3976                send_data,
3977                recv_data,
3978            } => run_conflux_client(tunnel, conflux_link_rx, send_data, recv_data).await,
3979        }
3980    }
3981
3982    // In this test, a `ConfluxTestEndpoint::Client` task creates a multipath tunnel
3983    // with 2 legs, opens a stream and sends 300 DATA cells on it.
3984    //
3985    // The test spawns two `ConfluxTestEndpoint::Relay` tasks (one for each leg),
3986    // which mock the behavior of an exit. The two relay tasks introduce
3987    // artificial delays before each SENDME sent to the client,
3988    // in order to trigger it to switch its sending leg predictably.
3989    //
3990    // The mock exit does not send any data on the stream.
3991    //
3992    // This test checks that the client sends SWITCH cells at the right time,
3993    // and that all the data it sent over the stream arrived at the exit.
3994    //
3995    // Note, however, that it doesn't check that the client sends the data in
3996    // the right order. For simplicity, the test concatenates the data received
3997    // on both legs, sorts it, and then compares it against the of the data sent
3998    // by the client (TODO: improve this)
3999    #[traced_test]
4000    #[test]
4001    #[cfg(feature = "conflux")]
4002    fn multipath_client_to_exit() {
4003        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
4004            /// The number of data cells to send.
4005            const NUM_CELLS: usize = 300;
4006            /// 498 bytes per DATA cell.
4007            const CELL_SIZE: usize = 498;
4008
4009            let TestTunnelCtx {
4010                tunnel,
4011                circs,
4012                conflux_link_rx,
4013            } = setup_good_conflux_tunnel(&rt).await;
4014            let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
4015
4016            // The stream data we're going to send over the conflux tunnel
4017            let mut send_data = (0..255_u8)
4018                .cycle()
4019                .take(NUM_CELLS * CELL_SIZE)
4020                .collect::<Vec<_>>();
4021            let stream_state = Arc::new(Mutex::new(ConfluxStreamState::new(send_data.len())));
4022
4023            let mut tasks = vec![];
4024
4025            // Channels used by the mock relays to notify each other
4026            // of various events.
4027            let (tx1, rx1) = mpsc::channel(1);
4028            let (tx2, rx2) = mpsc::channel(1);
4029
4030            // The 9 RTT delays to insert before each of the 9 SENDMEs
4031            // the exit will end up sending.
4032            //
4033            // Note: the first delay is the init_rtt delay (measured during the conflux HS).
4034            let circ1_rtt_delays = [
4035                // Initially, circ1 has better RTT, so we will start on this leg.
4036                Some(Duration::from_millis(100)),
4037                // But then its RTT takes a turn for the worse,
4038                // triggering a switch after the first SENDME is processed
4039                // (this happens after sending 123 DATA cells).
4040                Some(Duration::from_millis(500)),
4041                Some(Duration::from_millis(700)),
4042                Some(Duration::from_millis(900)),
4043                Some(Duration::from_millis(1100)),
4044                Some(Duration::from_millis(1300)),
4045                Some(Duration::from_millis(1500)),
4046                Some(Duration::from_millis(1700)),
4047                Some(Duration::from_millis(1900)),
4048                Some(Duration::from_millis(2100)),
4049            ]
4050            .into_iter();
4051
4052            let circ2_rtt_delays = [
4053                Some(Duration::from_millis(200)),
4054                Some(Duration::from_millis(400)),
4055                Some(Duration::from_millis(600)),
4056                Some(Duration::from_millis(800)),
4057                Some(Duration::from_millis(1000)),
4058                Some(Duration::from_millis(1200)),
4059                Some(Duration::from_millis(1400)),
4060                Some(Duration::from_millis(1600)),
4061                Some(Duration::from_millis(1800)),
4062                Some(Duration::from_millis(2000)),
4063            ]
4064            .into_iter();
4065
4066            let expected_switches1 = vec![ExpectedSwitch {
4067                // We start on this leg, and receive a BEGIN cell,
4068                // followed by (4 * 31 - 1) = 123 DATA cells.
4069                // Then it becomes blocked on CC, then finally the reactor
4070                // realizes it has some SENDMEs to process, and
4071                // then as a result of the new RTT measurement, we switch to circ1,
4072                // and then finally we switch back here, and get another SWITCH
4073                // as the 126th cell.
4074                cells_so_far: 126,
4075                // Leg 2 switches back to this leg after the 249th cell
4076                // (just before sending the 250th one):
4077                // seqno = 125 carried over from leg 1 (see the seqno of the
4078                // SWITCH expected on leg 2 below), plus 1 SWITCH, plus
4079                // 4 * 31 = 124 DATA cells after which the RTT of the first leg
4080                // is deemed favorable again.
4081                //
4082                // 249 - 125 (last_seq_sent of leg 1) = 124
4083                seqno: 124,
4084            }];
4085
4086            let expected_switches2 = vec![ExpectedSwitch {
4087                // The SWITCH is the first cell we received after the conflux HS
4088                // on this leg.
4089                cells_so_far: 1,
4090                // See explanation on the ExpectedSwitch from circ1 above.
4091                seqno: 125,
4092            }];
4093
4094            let relay_runtime = Arc::new(AsyncMutex::new(rt.clone()));
4095
4096            // Drop the senders and close the channels,
4097            // we have nothing to send in this test.
4098            let (_, cells_rx1) = mpsc::channel(1);
4099            let (_, cells_rx2) = mpsc::channel(1);
4100
4101            let relay1 = ConfluxExitState {
4102                runtime: Arc::clone(&relay_runtime),
4103                tunnel: Arc::clone(&tunnel),
4104                circ: circ1,
4105                rtt_delays: circ1_rtt_delays,
4106                stream_state: Arc::clone(&stream_state),
4107                expect_switch: expected_switches1,
4108                event_tx: tx1,
4109                event_rx: rx2,
4110                is_sending_leg: true,
4111                cells_rx: cells_rx1,
4112            };
4113
4114            let relay2 = ConfluxExitState {
4115                runtime: Arc::clone(&relay_runtime),
4116                tunnel: Arc::clone(&tunnel),
4117                circ: circ2,
4118                rtt_delays: circ2_rtt_delays,
4119                stream_state: Arc::clone(&stream_state),
4120                expect_switch: expected_switches2,
4121                event_tx: tx2,
4122                event_rx: rx1,
4123                is_sending_leg: false,
4124                cells_rx: cells_rx2,
4125            };
4126
4127            for mut mock_relay in [relay1, relay2] {
4128                let leg = mock_relay.circ.unique_id;
4129
4130                // Do the conflux handshake
4131                //
4132                // We do this outside of run_conflux_endpoint,
4133                // toa void running both handshakes at concurrently
4134                // (this gives more predictable RTT delays:
4135                // if both handshake tasks run at once, they race
4136                // to advance the mock runtime's clock)
4137                good_exit_handshake(
4138                    &relay_runtime,
4139                    mock_relay.rtt_delays.next().flatten(),
4140                    &mut mock_relay.circ.chan_rx,
4141                    &mut mock_relay.circ.circ_tx,
4142                )
4143                .await;
4144
4145                let relay = ConfluxTestEndpoint::Relay(mock_relay);
4146
4147                tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
4148            }
4149
4150            tasks.push(rt.spawn_join(
4151                "client task".to_string(),
4152                run_conflux_endpoint(ConfluxTestEndpoint::Client {
4153                    tunnel,
4154                    conflux_link_rx,
4155                    send_data: send_data.clone(),
4156                    recv_data: vec![],
4157                }),
4158            ));
4159            let _sinks = futures::future::join_all(tasks).await;
4160            let mut stream_state = stream_state.lock().unwrap();
4161            assert!(stream_state.begin_recvd);
4162
4163            stream_state.data_recvd.sort();
4164            send_data.sort();
4165            assert_eq!(stream_state.data_recvd, send_data);
4166        });
4167    }
4168
4169    // In this test, a `ConfluxTestEndpoint::Client` task creates a multipath tunnel
4170    // with 2 legs, opens a stream and reads from the stream until the stream is closed.
4171    //
4172    // The test spawns two `ConfluxTestEndpoint::Relay` tasks (one for each leg),
4173    // which mock the behavior of an exit. The two tasks send DATA and SWITCH
4174    // cells on the two circuit "legs" such that some cells arrive out of order.
4175    // This forces the client to buffer some cells, and then reorder them when
4176    // the missing cells finally arrive.
4177    //
4178    // The client does not send any data on the stream.
4179    #[cfg(feature = "conflux")]
4180    async fn run_multipath_exit_to_client_test(
4181        rt: MockRuntime,
4182        tunnel: TestTunnelCtx,
4183        cells_to_send: Vec<(UniqId, AnyRelayMsg)>,
4184        send_data: Vec<u8>,
4185        recv_data: Vec<u8>,
4186    ) -> Arc<Mutex<ConfluxStreamState>> {
4187        let TestTunnelCtx {
4188            tunnel,
4189            circs,
4190            conflux_link_rx,
4191        } = tunnel;
4192        let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
4193
4194        let stream_state = Arc::new(Mutex::new(ConfluxStreamState::new(send_data.len())));
4195
4196        let mut tasks = vec![];
4197        let relay_runtime = Arc::new(AsyncMutex::new(rt.clone()));
4198        let (cells_tx1, cells_rx1) = mpsc::channel(1);
4199        let (cells_tx2, cells_rx2) = mpsc::channel(1);
4200
4201        let dispatcher = CellDispatcher {
4202            leg_tx: [(circ1.unique_id, cells_tx1), (circ2.unique_id, cells_tx2)]
4203                .into_iter()
4204                .collect(),
4205            cells_to_send,
4206        };
4207
4208        // Channels used by the mock relays to notify each other
4209        // of various events.
4210        let (tx1, rx1) = mpsc::channel(1);
4211        let (tx2, rx2) = mpsc::channel(1);
4212
4213        let relay1 = ConfluxExitState {
4214            runtime: Arc::clone(&relay_runtime),
4215            tunnel: Arc::clone(&tunnel),
4216            circ: circ1,
4217            rtt_delays: [].into_iter(),
4218            stream_state: Arc::clone(&stream_state),
4219            // Expect no SWITCH cells from the client
4220            expect_switch: vec![],
4221            event_tx: tx1,
4222            event_rx: rx2,
4223            is_sending_leg: false,
4224            cells_rx: cells_rx1,
4225        };
4226
4227        let relay2 = ConfluxExitState {
4228            runtime: Arc::clone(&relay_runtime),
4229            tunnel: Arc::clone(&tunnel),
4230            circ: circ2,
4231            rtt_delays: [].into_iter(),
4232            stream_state: Arc::clone(&stream_state),
4233            // Expect no SWITCH cells from the client
4234            expect_switch: vec![],
4235            event_tx: tx2,
4236            event_rx: rx1,
4237            is_sending_leg: true,
4238            cells_rx: cells_rx2,
4239        };
4240
4241        // Run the cell dispatcher, which tells each exit leg task
4242        // what cells to write.
4243        //
4244        // This enables us to write out-of-order cells deterministically.
4245        rt.spawn(dispatcher.run()).unwrap();
4246
4247        for mut mock_relay in [relay1, relay2] {
4248            let leg = mock_relay.circ.unique_id;
4249
4250            good_exit_handshake(
4251                &relay_runtime,
4252                mock_relay.rtt_delays.next().flatten(),
4253                &mut mock_relay.circ.chan_rx,
4254                &mut mock_relay.circ.circ_tx,
4255            )
4256            .await;
4257
4258            let relay = ConfluxTestEndpoint::Relay(mock_relay);
4259
4260            tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
4261        }
4262
4263        tasks.push(rt.spawn_join(
4264            "client task".to_string(),
4265            run_conflux_endpoint(ConfluxTestEndpoint::Client {
4266                tunnel,
4267                conflux_link_rx,
4268                send_data: send_data.clone(),
4269                recv_data,
4270            }),
4271        ));
4272
4273        // Wait for all the tasks to complete
4274        let _sinks = futures::future::join_all(tasks).await;
4275
4276        stream_state
4277    }
4278
4279    #[traced_test]
4280    #[test]
4281    #[cfg(feature = "conflux")]
4282    fn multipath_exit_to_client() {
4283        // The data we expect the client to read from the stream
4284        const TO_SEND: &[u8] =
4285            b"But something about Buster Friendly irritated John Isidore, one specific thing";
4286
4287        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
4288            // The indices of the tunnel legs.
4289            const CIRC1: usize = 0;
4290            const CIRC2: usize = 1;
4291
4292            // The client receives the following cells, in the order indicated
4293            // by the t0-t8 "timestamps" (where C = CONNECTED, D = DATA, E = END,
4294            // S = SWITCH):
4295            //
4296            //  Leg 1 (CIRC1):   -----------D--------------------- D -- D -- C
4297            //                              |                      |    |    | \
4298            //                              |                      |    |    |  v
4299            //                              |                      |    |    | client
4300            //                              |                      |    |    |  ^
4301            //                              |                      |    |    |/
4302            //  Leg 2 (CIRC2): E - D -- D --\--- D* -- S (seqno=4)-/----/----/
4303            //                 |   |    |   |    |       |         |    |    |
4304            //                 |   |    |   |    |       |         |    |    |
4305            //                 |   |    |   |    |       |         |    |    |
4306            //  Time:          t8  t7   t6  t5   t4      t3        t2   t1  t0
4307            //
4308            //
4309            //  The cells marked with * are out of order.
4310            //
4311            // Note: t0 is the time when the client receives the first cell,
4312            // and t8 is the time when it receives the last one.
4313            // In other words, this test simulates a mock exit that "sent" the cells
4314            // in the order t0, t1, t2, t5, t4, t6, t7, t8
4315            let simple_switch = vec![
4316                (CIRC1, relaymsg::Data::new(&TO_SEND[0..5]).unwrap().into()),
4317                (CIRC1, relaymsg::Data::new(&TO_SEND[5..10]).unwrap().into()),
4318                // Switch to sending on the second leg
4319                (CIRC2, relaymsg::ConfluxSwitch::new(4).into()),
4320                // An out of order cell!
4321                (CIRC2, relaymsg::Data::new(&TO_SEND[20..30]).unwrap().into()),
4322                // The missing cell (as indicated by seqno = 4 from the switch cell above)
4323                // is finally arriving on leg1
4324                (CIRC1, relaymsg::Data::new(&TO_SEND[10..20]).unwrap().into()),
4325                (CIRC2, relaymsg::Data::new(&TO_SEND[30..40]).unwrap().into()),
4326                (CIRC2, relaymsg::Data::new(&TO_SEND[40..]).unwrap().into()),
4327            ];
4328
4329            //  Leg 1 (CIRC1): ---------------- D  ------D* --- S(seqno = 3) -- D - D ---------------------------- C
4330            //                                  |        |          |           |   |                              | \
4331            //                                  |        |          |           |   |                              |  v
4332            //                                  |        |          |           |   |                              |  client
4333            //                                  |        |          |           |   |                              |  ^
4334            //                                  |        |          |           |   |                              | /
4335            //  Leg 2 (CIRC2): E - S(seqno = 2) \ -- D --\----------\---------- \ --\--- D* -- D* - S(seqno = 3) --/
4336            //                 |        |       |    |   |          |           |   |    |     |         |         |
4337            //                 |        |       |    |   |          |           |   |    |     |         |         |
4338            //                 |        |       |    |   |          |           |   |    |     |         |         |
4339            //  Time:          t11      t10     t9   t8  t7         t6          t5  t4   t3    t2        t1        t0
4340            //  =====================================================================================================
4341            //  Leg 1 LSR:      8        8      8 7  7   7          6           3   2    1      1        1         1
4342            //  Leg 2 LSR:      9        8      6 6  6   5          5           5   5    5      4        3         0
4343            //  LSD:            9        8      8 7  6   5          5       5   3   2    1      1        1         1
4344            //                                    ^ OOO cell is delivered   ^ the OOO cells are delivered to the stream
4345            //
4346            //
4347            //  (LSR = last seq received, LSD = last seq delivered, both from the client's POV)
4348            //
4349            //
4350            // The client keeps track of the `last_seqno_received` (LSR) on each leg.
4351            // This is incremented for each cell that counts towards the seqnos (BEGIN, DATA, etc.)
4352            // that is received on the leg. The client also tracks the `last_seqno_delivered` (LSD),
4353            // which is the seqno of the last cell delivered to a stream
4354            // (this is global for the whole tunnel, whereas the LSR is different for each leg).
4355            //
4356            // When switching to leg `N`, the seqno in the switch is, from the POV of the sender,
4357            // the delta between the absolute seqno (i.e. the total number of cells[^1] sent)
4358            // and the value of this absolute seqno when leg `N` was last used.
4359            //
4360            // At the time of the first SWITCH from `t1`, the exit "sent" 3 cells:
4361            // a `CONNECTED` cell, which was received by the client at `t0`, and 2 `DATA` cells that
4362            // haven't been received yet. At this point, the exit decides to switch to leg 2,
4363            // on which it hasn't sent any cells yet, so the seqno is set to `3 - 0 = 3`.
4364            //
4365            // At `t6` when the exit sends the second switch (leg 2 -> leg 1), has "sent" 6 cells
4366            // (`C` plus the data cells that are received at `t1 - 5` and `t8`.
4367            // The seqno is `6 - 3 = 3`, because when it last sent on leg 1,
4368            // the absolute seqno was `3`.
4369            //
4370            // At `t10`, the absolute seqno is 8 (8 qualifying cells have been sent so far).
4371            // When the exit last sent on leg 2 (which we are switching to),
4372            // the absolute seqno was `6`, so the `SWITCH` cell will have `8 - 6 = 2` as the seqno.
4373            //
4374            // [^1]: only counting the cells that count towards sequence numbers
4375            let multiple_switches = vec![
4376                // Immediately switch to sending on the second leg
4377                // (indicating that we've already sent 3 cells (including the CONNECTED)
4378                (CIRC2, relaymsg::ConfluxSwitch::new(3).into()),
4379                // Two out of order cells!
4380                (CIRC2, relaymsg::Data::new(&TO_SEND[15..20]).unwrap().into()),
4381                (CIRC2, relaymsg::Data::new(&TO_SEND[20..30]).unwrap().into()),
4382                // The missing cells finally arrive on the first leg
4383                (CIRC1, relaymsg::Data::new(&TO_SEND[0..10]).unwrap().into()),
4384                (CIRC1, relaymsg::Data::new(&TO_SEND[10..15]).unwrap().into()),
4385                // Switch back to the first leg
4386                (CIRC1, relaymsg::ConfluxSwitch::new(3).into()),
4387                // OOO cell
4388                (CIRC1, relaymsg::Data::new(&TO_SEND[31..40]).unwrap().into()),
4389                // Missing cell is received
4390                (CIRC2, relaymsg::Data::new(&TO_SEND[30..31]).unwrap().into()),
4391                // The remaining cells are in-order
4392                (CIRC1, relaymsg::Data::new(&TO_SEND[40..]).unwrap().into()),
4393                // Switch right after we've sent all the data we had to send
4394                (CIRC2, relaymsg::ConfluxSwitch::new(2).into()),
4395            ];
4396
4397            // TODO: give these tests the ability to control when END cells are sent
4398            // (currently we have ensure the is_sending_leg is set to true
4399            // on the leg that ends up sending the last data cell).
4400            //
4401            // TODO: test the edge cases
4402            let tests = [simple_switch, multiple_switches];
4403
4404            for cells_to_send in tests {
4405                let tunnel = setup_good_conflux_tunnel(&rt).await;
4406                assert_eq!(tunnel.circs.len(), 2);
4407                let circ_ids = [tunnel.circs[0].unique_id, tunnel.circs[1].unique_id];
4408                let cells_to_send = cells_to_send
4409                    .into_iter()
4410                    .map(|(i, cell)| (circ_ids[i], cell))
4411                    .collect();
4412
4413                // The client won't be sending any DATA cells on this stream
4414                let send_data = vec![];
4415                let stream_state = run_multipath_exit_to_client_test(
4416                    rt.clone(),
4417                    tunnel,
4418                    cells_to_send,
4419                    send_data.clone(),
4420                    TO_SEND.into(),
4421                )
4422                .await;
4423                let stream_state = stream_state.lock().unwrap();
4424                assert!(stream_state.begin_recvd);
4425                // We don't expect the client to have sent anything
4426                assert!(stream_state.data_recvd.is_empty());
4427            }
4428        });
4429    }
4430
4431    #[traced_test]
4432    #[test]
4433    #[cfg(all(feature = "conflux", feature = "hs-service"))]
4434    fn conflux_incoming_stream() {
4435        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
4436            use std::error::Error as _;
4437
4438            const EXPECTED_HOP: u8 = 1;
4439
4440            let TestTunnelCtx {
4441                tunnel,
4442                circs,
4443                conflux_link_rx,
4444            } = setup_good_conflux_tunnel(&rt).await;
4445
4446            let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
4447
4448            let link = await_link_payload(&mut circ1.chan_rx).await;
4449            for circ in [&mut circ1, &mut circ2] {
4450                let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
4451                circ.circ_tx
4452                    .send(rmsg_to_ccmsg(None, linked))
4453                    .await
4454                    .unwrap();
4455            }
4456
4457            let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
4458            assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
4459
4460            // TODO(#2002): we don't currently support conflux for onion services
4461            let err = tunnel
4462                .allow_stream_requests(
4463                    &[tor_cell::relaycell::RelayCmd::BEGIN],
4464                    (tunnel.unique_id(), EXPECTED_HOP.into()).into(),
4465                    AllowAllStreamsFilter,
4466                )
4467                .await
4468                // IncomingStream doesn't impl Debug, so we need to map to a different type
4469                .map(|_| ())
4470                .unwrap_err();
4471
4472            let err_src = err.source().unwrap();
4473            assert!(err_src
4474                .to_string()
4475                .contains("Cannot allow stream requests on tunnel with 2 legs"));
4476        });
4477    }
4478}