tor_proto/tunnel/
circuit.rs

1//! Multi-hop paths over the Tor network.
2//!
3//! Right now, we only implement "client circuits" -- also sometimes
4//! called "origin circuits".  A client circuit is one that is
5//! constructed by this Tor instance, and used in its own behalf to
6//! send data over the Tor network.
7//!
8//! Each circuit has multiple hops over the Tor network: each hop
9//! knows only the hop before and the hop after.  The client shares a
10//! separate set of keys with each hop.
11//!
12//! To build a circuit, first create a [crate::channel::Channel], then
13//! call its [crate::channel::Channel::new_circ] method.  This yields
14//! a [PendingClientCirc] object that won't become live until you call
15//! one of the methods
16//! (typically [`PendingClientCirc::create_firsthop`])
17//! that extends it to its first hop.  After you've
18//! done that, you can call [`ClientCirc::extend`] on the circuit to
19//! build it into a multi-hop circuit.  Finally, you can use
20//! [ClientCirc::begin_stream] to get a Stream object that can be used
21//! for anonymized data.
22//!
23//! # Implementation
24//!
25//! Each open circuit has a corresponding Reactor object that runs in
26//! an asynchronous task, and manages incoming cells from the
27//! circuit's upstream channel.  These cells are either RELAY cells or
28//! DESTROY cells.  DESTROY cells are handled immediately.
29//! RELAY cells are either for a particular stream, in which case they
30//! get forwarded to a RawCellStream object, or for no particular stream,
31//! in which case they are considered "meta" cells (like EXTENDED2)
32//! that should only get accepted if something is waiting for them.
33//!
34//! # Limitations
35//!
36//! This is client-only.
37
38pub(crate) mod celltypes;
39pub(crate) mod halfcirc;
40
41#[cfg(feature = "hs-common")]
42pub mod handshake;
43#[cfg(not(feature = "hs-common"))]
44pub(crate) mod handshake;
45
46pub(super) mod path;
47pub(crate) mod unique_id;
48
49use crate::channel::Channel;
50use crate::congestion::params::CongestionControlParams;
51use crate::crypto::cell::HopNum;
52use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
53use crate::memquota::{CircuitAccount, SpecificAccount as _};
54use crate::stream::{
55    AnyCmdChecker, DataCmdChecker, DataStream, ResolveCmdChecker, ResolveStream, StreamParameters,
56    StreamReader,
57};
58use crate::tunnel::circuit::celltypes::*;
59use crate::tunnel::reactor::CtrlCmd;
60use crate::tunnel::reactor::{
61    CircuitHandshake, CtrlMsg, Reactor, RECV_WINDOW_INIT, STREAM_READER_BUFFER,
62};
63use crate::tunnel::{StreamTarget, TargetHop};
64use crate::util::skew::ClockSkew;
65use crate::{Error, ResolveError, Result};
66use educe::Educe;
67use path::HopDetail;
68use tor_cell::{
69    chancell::CircId,
70    relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal},
71};
72
73use tor_error::{bad_api_usage, internal, into_internal};
74use tor_linkspec::{CircTarget, LinkSpecType, OwnedChanTarget, RelayIdType};
75use tor_protover::named;
76
77pub use crate::crypto::binding::CircuitBinding;
78pub use crate::memquota::StreamAccount;
79pub use crate::tunnel::circuit::unique_id::UniqId;
80
81#[cfg(feature = "hs-service")]
82use {
83    crate::stream::{IncomingCmdChecker, IncomingStream},
84    crate::tunnel::reactor::StreamReqInfo,
85};
86
87use futures::channel::mpsc;
88use oneshot_fused_workaround as oneshot;
89
90use crate::congestion::sendme::StreamRecvWindow;
91use crate::DynTimeProvider;
92use futures::FutureExt as _;
93use std::collections::HashMap;
94use std::net::IpAddr;
95use std::sync::{Arc, Mutex};
96use tor_memquota::mq_queue::{self, ChannelSpec as _, MpscSpec};
97
98use crate::crypto::handshake::ntor::NtorPublicKey;
99
100pub use path::{Path, PathEntry};
101
102/// The size of the buffer for communication between `ClientCirc` and its reactor.
103pub const CIRCUIT_BUFFER_SIZE: usize = 128;
104
105#[cfg(feature = "send-control-msg")]
106use {crate::tunnel::msghandler::UserMsgHandler, crate::tunnel::reactor::MetaCellHandler};
107
108pub use crate::tunnel::reactor::syncview::ClientCircSyncView;
109#[cfg(feature = "send-control-msg")]
110#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
111pub use {crate::tunnel::msghandler::MsgHandler, crate::tunnel::reactor::MetaCellDisposition};
112
113/// MPSC queue relating to a stream (either inbound or outbound), sender
114pub(crate) type StreamMpscSender<T> = mq_queue::Sender<T, MpscSpec>;
115/// MPSC queue relating to a stream (either inbound or outbound), receiver
116pub(crate) type StreamMpscReceiver<T> = mq_queue::Receiver<T, MpscSpec>;
117
118/// MPSC queue for inbound data on its way from channel to circuit, sender
119pub(crate) type CircuitRxSender = mq_queue::Sender<ClientCircChanMsg, MpscSpec>;
120/// MPSC queue for inbound data on its way from channel to circuit, receiver
121pub(crate) type CircuitRxReceiver = mq_queue::Receiver<ClientCircChanMsg, MpscSpec>;
122
123#[derive(Debug)]
124/// A circuit that we have constructed over the Tor network.
125///
126/// # Circuit life cycle
127///
128/// `ClientCirc`s are created in an initially unusable state using [`Channel::new_circ`],
129/// which returns a [`PendingClientCirc`].  To get a real (one-hop) circuit from
130/// one of these, you invoke one of its `create_firsthop` methods (typically
131/// [`create_firsthop_fast()`](PendingClientCirc::create_firsthop_fast) or
132/// [`create_firsthop()`](PendingClientCirc::create_firsthop)).
133/// Then, to add more hops to the circuit, you can call
134/// [`extend()`](ClientCirc::extend) on it.
135///
136/// For higher-level APIs, see the `tor-circmgr` crate: the ones here in
137/// `tor-proto` are probably not what you need.
138///
139/// After a circuit is created, it will persist until it is closed in one of
140/// five ways:
141///    1. A remote error occurs.
142///    2. Some hop on the circuit sends a `DESTROY` message to tear down the
143///       circuit.
144///    3. The circuit's channel is closed.
145///    4. Someone calls [`ClientCirc::terminate`] on the circuit.
146///    5. The last reference to the `ClientCirc` is dropped. (Note that every stream
147///       on a `ClientCirc` keeps a reference to it, which will in turn keep the
148///       circuit from closing until all those streams have gone away.)
149///
150/// Note that in cases 1-4 the [`ClientCirc`] object itself will still exist: it
151/// will just be unusable for most purposes.  Most operations on it will fail
152/// with an error.
153//
154// Effectively, this struct contains two Arcs: one for `path` and one for
155// `control` (which surely has something Arc-like in it).  We cannot unify
156// these by putting a single Arc around the whole struct, and passing
157// an Arc strong reference to the `Reactor`, because then `control` would
158// not be dropped when the last user of the circuit goes away.  We could
159// make the reactor have a weak reference but weak references are more
160// expensive to dereference.
161//
162// Because of the above, cloning this struct is always going to involve
163// two atomic refcount changes/checks.  Wrapping it in another Arc would
164// be overkill.
165//
166pub struct ClientCirc {
167    /// Mutable state shared with the `Reactor`.
168    mutable: Arc<TunnelMutableState>,
169    /// A unique identifier for this circuit.
170    unique_id: UniqId,
171    /// Channel to send control messages to the reactor.
172    pub(super) control: mpsc::UnboundedSender<CtrlMsg>,
173    /// Channel to send commands to the reactor.
174    pub(super) command: mpsc::UnboundedSender<CtrlCmd>,
175    /// A future that resolves to Cancelled once the reactor is shut down,
176    /// meaning that the circuit is closed.
177    #[cfg_attr(not(feature = "experimental-api"), allow(dead_code))]
178    reactor_closed_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
179    /// For testing purposes: the CircId, for use in peek_circid().
180    #[cfg(test)]
181    circid: CircId,
182    /// Memory quota account
183    memquota: CircuitAccount,
184    /// Time provider
185    time_provider: DynTimeProvider,
186}
187
188/// The mutable state of a tunnel, shared between [`ClientCirc`] and [`Reactor`].
189///
190/// NOTE(gabi): this mutex-inside-a-mutex might look suspicious,
191/// but it is currently the best option we have for sharing
192/// the circuit state with `ClientCirc` (and soon, with `ClientTunnel`).
193/// In practice, these mutexes won't be accessed very often
194/// (they're accessed for writing when a circuit is extended,
195/// and for reading by the various `ClientCirc` APIs),
196/// so they shouldn't really impact performance.
197///
198/// Alternatively, the circuit state information could be shared
199/// outside the reactor through a channel (passed to the reactor via a `CtrlCmd`),
200/// but in #1840 @opara notes that involves making the `ClientCirc` accessors
201/// (`ClientCirc::path`, `ClientCirc::binding_key`, etc.)
202/// asynchronous, which will significantly complicate their callsites,
203/// which would in turn need to be made async too.
204///
205/// We should revisit this decision at some point, and decide whether an async API
206/// would be preferable.
207#[derive(Debug, Default)]
208pub(super) struct TunnelMutableState(Mutex<HashMap<UniqId, Arc<MutableState>>>);
209
210impl TunnelMutableState {
211    /// Add the [`MutableState`] of a circuit.
212    pub(super) fn insert(&self, unique_id: UniqId, mutable: Arc<MutableState>) {
213        #[allow(unused)] // unused in non-debug builds
214        let state = self
215            .0
216            .lock()
217            .expect("lock poisoned")
218            .insert(unique_id, mutable);
219
220        debug_assert!(state.is_none());
221    }
222
223    /// Remove the [`MutableState`] of a circuit.
224    pub(super) fn remove(&self, unique_id: UniqId) {
225        #[allow(unused)] // unused in non-debug builds
226        let state = self.0.lock().expect("lock poisoned").remove(&unique_id);
227
228        debug_assert!(state.is_some());
229    }
230
231    /// Return a [`Path`] object describing all the hops in the specified circuit.
232    ///
233    /// See [`MutableState::path`].
234    fn path_ref(&self, unique_id: UniqId) -> Result<Arc<Path>> {
235        let lock = self.0.lock().expect("lock poisoned");
236        let mutable = lock
237            .get(&unique_id)
238            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
239
240        Ok(mutable.path())
241    }
242
243    /// Return a description of the first hop of this circuit.
244    ///
245    /// Returns an error if a circuit with the specified [`UniqId`] doesn't exist.
246    /// Returns `Ok(None)` if the specified circuit doesn't have any hops.
247    fn first_hop(&self, unique_id: UniqId) -> Result<Option<OwnedChanTarget>> {
248        let lock = self.0.lock().expect("lock poisoned");
249        let mutable = lock
250            .get(&unique_id)
251            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
252
253        let first_hop = mutable.first_hop().map(|first_hop| match first_hop {
254            path::HopDetail::Relay(r) => r,
255            #[cfg(feature = "hs-common")]
256            path::HopDetail::Virtual => {
257                panic!("somehow made a circuit with a virtual first hop.")
258            }
259        });
260
261        Ok(first_hop)
262    }
263
264    /// Return the [`HopNum`] of the last hop of the specified circuit.
265    ///
266    /// Returns an error if a circuit with the specified [`UniqId`] doesn't exist.
267    ///
268    /// See [`MutableState::last_hop_num`].
269    fn last_hop_num(&self, unique_id: UniqId) -> Result<Option<HopNum>> {
270        let lock = self.0.lock().expect("lock poisoned");
271        let mutable = lock
272            .get(&unique_id)
273            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
274
275        Ok(mutable.last_hop_num())
276    }
277
278    /// Return the number of hops in the specified circuit.
279    ///
280    /// See [`MutableState::n_hops`].
281    fn n_hops(&self, unique_id: UniqId) -> Result<usize> {
282        let lock = self.0.lock().expect("lock poisoned");
283        let mutable = lock
284            .get(&unique_id)
285            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
286
287        Ok(mutable.n_hops())
288    }
289
290    /// Return the cryptographic material used to prove knowledge of a shared
291    /// secret with with `hop` on the circuit with the specified `unique_id`.
292    fn binding_key(&self, unique_id: UniqId, hop: HopNum) -> Result<Option<CircuitBinding>> {
293        let lock = self.0.lock().expect("lock poisoned");
294        let mutable = lock
295            .get(&unique_id)
296            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
297
298        Ok(mutable.binding_key(hop))
299    }
300
301    /// Return the number of legs in this tunnel.
302    ///
303    /// TODO(conflux-fork): this can be removed once we modify `path_ref`
304    /// to return *all* the Paths in the tunnel.
305    fn n_legs(&self) -> usize {
306        let lock = self.0.lock().expect("lock poisoned");
307        lock.len()
308    }
309}
310
311/// The mutable state of a circuit.
312#[derive(Educe, Default)]
313#[educe(Debug)]
314pub(super) struct MutableState(Mutex<CircuitState>);
315
316impl MutableState {
317    /// Add a hop to the path of this circuit.
318    pub(super) fn add_hop(&self, peer_id: HopDetail, binding: Option<CircuitBinding>) {
319        let mut mutable = self.0.lock().expect("poisoned lock");
320        Arc::make_mut(&mut mutable.path).push_hop(peer_id);
321        mutable.binding.push(binding);
322    }
323
324    /// Get a copy of the circuit's current [`path::Path`].
325    pub(super) fn path(&self) -> Arc<path::Path> {
326        let mutable = self.0.lock().expect("poisoned lock");
327        Arc::clone(&mutable.path)
328    }
329
330    /// Return the cryptographic material used to prove knowledge of a shared
331    /// secret with with `hop`.
332    pub(super) fn binding_key(&self, hop: HopNum) -> Option<CircuitBinding> {
333        let mutable = self.0.lock().expect("poisoned lock");
334
335        mutable.binding.get::<usize>(hop.into()).cloned().flatten()
336        // NOTE: I'm not thrilled to have to copy this information, but we use
337        // it very rarely, so it's not _that_ bad IMO.
338    }
339
340    /// Return a description of the first hop of this circuit.
341    fn first_hop(&self) -> Option<HopDetail> {
342        let mutable = self.0.lock().expect("poisoned lock");
343        mutable.path.first_hop()
344    }
345
346    /// Return the [`HopNum`] of the last hop of this circuit.
347    ///
348    /// NOTE: This function will return the [`HopNum`] of the hop
349    /// that is _currently_ the last. If there is an extend operation in progress,
350    /// the currently pending hop may or may not be counted, depending on whether
351    /// the extend operation finishes before this call is done.
352    fn last_hop_num(&self) -> Option<HopNum> {
353        let mutable = self.0.lock().expect("poisoned lock");
354        mutable.path.last_hop_num()
355    }
356
357    /// Return the number of hops in this circuit.
358    ///
359    /// NOTE: This function will currently return only the number of hops
360    /// _currently_ in the circuit. If there is an extend operation in progress,
361    /// the currently pending hop may or may not be counted, depending on whether
362    /// the extend operation finishes before this call is done.
363    fn n_hops(&self) -> usize {
364        let mutable = self.0.lock().expect("poisoned lock");
365        mutable.path.n_hops()
366    }
367}
368
369/// The shared state of a circuit.
370#[derive(Educe, Default)]
371#[educe(Debug)]
372pub(super) struct CircuitState {
373    /// Information about this circuit's path.
374    ///
375    /// This is stored in an Arc so that we can cheaply give a copy of it to
376    /// client code; when we need to add a hop (which is less frequent) we use
377    /// [`Arc::make_mut()`].
378    path: Arc<path::Path>,
379
380    /// Circuit binding keys [q.v.][`CircuitBinding`] information for each hop
381    /// in the circuit's path.
382    ///
383    /// NOTE: Right now, there is a `CircuitBinding` for every hop.  There's a
384    /// fair chance that this will change in the future, and I don't want other
385    /// code to assume that a `CircuitBinding` _must_ exist, so I'm making this
386    /// an `Option`.
387    #[educe(Debug(ignore))]
388    binding: Vec<Option<CircuitBinding>>,
389}
390
391/// A ClientCirc that needs to send a create cell and receive a created* cell.
392///
393/// To use one of these, call `create_firsthop_fast()` or `create_firsthop()`
394/// to negotiate the cryptographic handshake with the first hop.
395pub struct PendingClientCirc {
396    /// A oneshot receiver on which we'll receive a CREATED* cell,
397    /// or a DESTROY cell.
398    recvcreated: oneshot::Receiver<CreateResponse>,
399    /// The ClientCirc object that we can expose on success.
400    circ: Arc<ClientCirc>,
401}
402
403/// Description of the network's current rules for building circuits.
404///
405/// This type describes rules derived from the consensus,
406/// and possibly amended by our own configuration.
407///
408/// Typically, this type created once for an entire circuit,
409/// and any special per-hop information is derived
410/// from each hop as a CircTarget.
411/// Note however that callers _may_ provide different `CircParameters`
412/// for different hops within a circuit if they have some reason to do so,
413/// so we do not enforce that every hop in a circuit has the same `CircParameters`.
414#[non_exhaustive]
415#[derive(Clone, Debug)]
416pub struct CircParameters {
417    /// Whether we should include ed25519 identities when we send
418    /// EXTEND2 cells.
419    pub extend_by_ed25519_id: bool,
420    /// Congestion control parameters for this circuit.
421    pub ccontrol: CongestionControlParams,
422}
423
424/// The settings we use for single hop of a circuit.
425///
426/// Unlike [`CircParameters`], this type is crate-internal.
427/// We construct it based on our settings from the circuit,
428/// and from the hop's actual capabilities.
429/// Then, we negotiate with the hop as part of circuit
430/// creation/extension to determine the actual settings that will be in use.
431/// Finally, we use those settings to construct the negotiated circuit hop.
432//
433// TODO: Relays should probably derive an instance of this type too, as
434// part of the circuit creation handshake.
435#[derive(Clone, Debug)]
436pub(super) struct HopSettings {
437    /// The negotiated congestion control settings for this circuit.
438    pub(super) ccontrol: CongestionControlParams,
439}
440
441impl HopSettings {
442    /// Construct a new `HopSettings` based on `params` (a set of circuit parameters)
443    /// and `caps` (a set of protocol capabilities for a circuit target).
444    ///
445    /// The resulting settings will represent what the client would prefer to negotiate
446    /// (determined by `params`),
447    /// as modified by what the target relay is believed to support (represented by `caps`).
448    ///
449    /// This represents the `HopSettings` in a pre-negotiation state:
450    /// the circuit negotiation process will modify it.
451    #[allow(clippy::unnecessary_wraps)] // likely to become fallible in the future.
452    pub(super) fn from_params_and_caps(
453        params: &CircParameters,
454        caps: &tor_protover::Protocols,
455    ) -> Result<Self> {
456        let mut settings = Self {
457            ccontrol: params.ccontrol.clone(),
458        };
459
460        match settings.ccontrol.alg() {
461            crate::ccparams::Algorithm::FixedWindow(_) => {}
462            crate::ccparams::Algorithm::Vegas(_) => {
463                // If the target doesn't support FLOWCTRL_CC, we can't use Vegas.
464                if !caps.supports_named_subver(named::FLOWCTRL_CC) {
465                    settings.ccontrol.use_fallback_alg();
466                }
467            }
468        }
469
470        Ok(settings)
471    }
472
473    /// Return a new `HopSettings` based on this one,
474    /// representing the settings that we should use
475    /// if circuit negotiation will be impossible.
476    ///
477    /// (Circuit negotiation is impossible when using the legacy ntor protocol,
478    /// and when using CRATE_FAST.  It is currently unsupported with virtual hops.)
479    pub(super) fn without_negotiation(mut self) -> Self {
480        self.ccontrol.use_fallback_alg();
481        self
482    }
483}
484
485#[cfg(test)]
486impl std::default::Default for CircParameters {
487    fn default() -> Self {
488        Self {
489            extend_by_ed25519_id: true,
490            ccontrol: crate::congestion::test_utils::params::build_cc_fixed_params(),
491        }
492    }
493}
494
495impl CircParameters {
496    /// Constructor
497    pub fn new(extend_by_ed25519_id: bool, ccontrol: CongestionControlParams) -> Self {
498        Self {
499            extend_by_ed25519_id,
500            ccontrol,
501        }
502    }
503}
504
505impl ClientCirc {
506    /// Return a description of the first hop of this circuit.
507    ///
508    /// # Panics
509    ///
510    /// Panics if there is no first hop.  (This should be impossible outside of
511    /// the tor-proto crate, but within the crate it's possible to have a
512    /// circuit with no hops.)
513    pub fn first_hop(&self) -> Result<OwnedChanTarget> {
514        Ok(self
515            .mutable
516            .first_hop(self.unique_id)
517            .map_err(|_| Error::CircuitClosed)?
518            .expect("called first_hop on an un-constructed circuit"))
519    }
520
521    /// Return the [`HopNum`] of the last hop of this circuit.
522    ///
523    /// Returns an error if there is no last hop.  (This should be impossible outside of the
524    /// tor-proto crate, but within the crate it's possible to have a circuit with no hops.)
525    ///
526    /// NOTE: This function will return the [`HopNum`] of the hop
527    /// that is _currently_ the last. If there is an extend operation in progress,
528    /// the currently pending hop may or may not be counted, depending on whether
529    /// the extend operation finishes before this call is done.
530    pub fn last_hop_num(&self) -> Result<HopNum> {
531        Ok(self
532            .mutable
533            .last_hop_num(self.unique_id)?
534            .ok_or_else(|| internal!("no last hop index"))?)
535    }
536
537    /// Return a [`Path`] object describing all the hops in this circuit.
538    ///
539    /// Note that this `Path` is not automatically updated if the circuit is
540    /// extended.
541    pub fn path_ref(&self) -> Result<Arc<Path>> {
542        self.mutable
543            .path_ref(self.unique_id)
544            .map_err(|_| Error::CircuitClosed)
545    }
546
547    /// Get the clock skew claimed by the first hop of the circuit.
548    ///
549    /// See [`Channel::clock_skew()`].
550    pub async fn first_hop_clock_skew(&self) -> Result<ClockSkew> {
551        let (tx, rx) = oneshot::channel();
552
553        self.control
554            .unbounded_send(CtrlMsg::FirstHopClockSkew { answer: tx })
555            .map_err(|_| Error::CircuitClosed)?;
556
557        Ok(rx.await.map_err(|_| Error::CircuitClosed)??)
558    }
559
560    /// Return a reference to this circuit's memory quota account
561    pub fn mq_account(&self) -> &CircuitAccount {
562        &self.memquota
563    }
564
565    /// Return the cryptographic material used to prove knowledge of a shared
566    /// secret with with `hop`.
567    ///
568    /// See [`CircuitBinding`] for more information on how this is used.
569    ///
570    /// Return None if we have no circuit binding information for the hop, or if
571    /// the hop does not exist.
572    pub fn binding_key(&self, hop: HopNum) -> Result<Option<CircuitBinding>> {
573        self.mutable
574            .binding_key(self.unique_id, hop)
575            .map_err(|_| Error::CircuitClosed)
576    }
577
578    /// Start an ad-hoc protocol exchange to the specified hop on this circuit
579    ///
580    /// To use this:
581    ///
582    ///  0. Create an inter-task channel you'll use to receive
583    ///     the outcome of your conversation,
584    ///     and bundle it into a [`MsgHandler`].
585    ///
586    ///  1. Call `start_conversation`.
587    ///     This will install a your handler, for incoming messages,
588    ///     and send the outgoing message (if you provided one).
589    ///     After that, each message on the circuit
590    ///     that isn't handled by the core machinery
591    ///     is passed to your provided `reply_handler`.
592    ///
593    ///  2. Possibly call `send_msg` on the [`Conversation`],
594    ///     from the call site of `start_conversation`,
595    ///     possibly multiple times, from time to time,
596    ///     to send further desired messages to the peer.
597    ///
598    ///  3. In your [`MsgHandler`], process the incoming messages.
599    ///     You may respond by
600    ///     sending additional messages
601    ///     When the protocol exchange is finished,
602    ///     `MsgHandler::handle_msg` should return
603    ///     [`ConversationFinished`](MetaCellDisposition::ConversationFinished).
604    ///
605    /// If you don't need the `Conversation` to send followup messages,
606    /// you may simply drop it,
607    /// and rely on the responses you get from your handler,
608    /// on the channel from step 0 above.
609    /// Your handler will remain installed and able to process incoming messages
610    /// until it returns `ConversationFinished`.
611    ///
612    /// (If you don't want to accept any replies at all, it may be
613    /// simpler to use [`ClientCirc::send_raw_msg`].)
614    ///
615    /// Note that it is quite possible to use this function to violate the tor
616    /// protocol; most users of this API will not need to call it.  It is used
617    /// to implement most of the onion service handshake.
618    ///
619    /// # Limitations
620    ///
621    /// Only one conversation may be active at any one time,
622    /// for any one circuit.
623    /// This generally means that this function should not be called
624    /// on a circuit which might be shared with anyone else.
625    ///
626    /// Likewise, it is forbidden to try to extend the circuit,
627    /// while the conversation is in progress.
628    ///
629    /// After the conversation has finished, the circuit may be extended.
630    /// Or, `start_conversation` may be called again;
631    /// but, in that case there will be a gap between the two conversations,
632    /// during which no `MsgHandler` is installed,
633    /// and unexpected incoming messages would close the circuit.
634    ///
635    /// If these restrictions are violated, the circuit will be closed with an error.
636    ///
637    /// ## Precise definition of the lifetime of a conversation
638    ///
639    /// A conversation is in progress from entry to `start_conversation`,
640    /// until entry to the body of the [`MsgHandler::handle_msg`]
641    /// call which returns [`ConversationFinished`](MetaCellDisposition::ConversationFinished).
642    /// (*Entry* since `handle_msg` is synchronously embedded
643    /// into the incoming message processing.)
644    /// So you may start a new conversation as soon as you have the final response
645    /// via your inter-task channel from (0) above.
646    ///
647    /// The lifetime relationship of the [`Conversation`],
648    /// vs the handler returning `ConversationFinished`
649    /// is not enforced by the type system.
650    // Doing so without still leaving plenty of scope for runtime errors doesn't seem possible,
651    // at least while allowing sending followup messages from outside the handler.
652    //
653    // TODO hs: it might be nice to avoid exposing tor-cell APIs in the
654    //   tor-proto interface.
655    #[cfg(feature = "send-control-msg")]
656    pub async fn start_conversation(
657        &self,
658        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
659        reply_handler: impl MsgHandler + Send + 'static,
660        hop_num: HopNum,
661    ) -> Result<Conversation<'_>> {
662        let handler = Box::new(UserMsgHandler::new(hop_num, reply_handler));
663        let conversation = Conversation(self);
664        conversation.send_internal(msg, Some(handler)).await?;
665        Ok(conversation)
666    }
667
668    /// Send an ad-hoc message to a given hop on the circuit, without expecting
669    /// a reply.
670    ///
671    /// (If you want to handle one or more possible replies, see
672    /// [`ClientCirc::start_conversation`].)
673    #[cfg(feature = "send-control-msg")]
674    pub async fn send_raw_msg(
675        &self,
676        msg: tor_cell::relaycell::msg::AnyRelayMsg,
677        hop_num: HopNum,
678    ) -> Result<()> {
679        let (sender, receiver) = oneshot::channel();
680        let ctrl_msg = CtrlMsg::SendMsg {
681            hop_num,
682            msg,
683            sender,
684        };
685        self.control
686            .unbounded_send(ctrl_msg)
687            .map_err(|_| Error::CircuitClosed)?;
688
689        receiver.await.map_err(|_| Error::CircuitClosed)?
690    }
691
692    /// Tell this circuit to begin allowing the final hop of the circuit to try
693    /// to create new Tor streams, and to return those pending requests in an
694    /// asynchronous stream.
695    ///
696    /// Ordinarily, these requests are rejected.
697    ///
698    /// There can only be one [`Stream`](futures::Stream) of this type created on a given circuit.
699    /// If a such a [`Stream`](futures::Stream) already exists, this method will return
700    /// an error.
701    ///
702    /// After this method has been called on a circuit, the circuit is expected
703    /// to receive requests of this type indefinitely, until it is finally closed.
704    /// If the `Stream` is dropped, the next request on this circuit will cause it to close.
705    ///
706    /// Only onion services (and eventually) exit relays should call this
707    /// method.
708    //
709    // TODO: Someday, we might want to allow a stream request handler to be
710    // un-registered.  However, nothing in the Tor protocol requires it.
711    #[cfg(feature = "hs-service")]
712    pub async fn allow_stream_requests(
713        self: &Arc<ClientCirc>,
714        allow_commands: &[tor_cell::relaycell::RelayCmd],
715        hop_num: HopNum,
716        filter: impl crate::stream::IncomingStreamRequestFilter,
717    ) -> Result<impl futures::Stream<Item = IncomingStream>> {
718        use futures::stream::StreamExt;
719
720        use crate::tunnel::HopLocation;
721
722        /// The size of the channel receiving IncomingStreamRequestContexts.
723        const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
724
725        // TODO(#2002): support onion service conflux
726        let circ_count = self.mutable.n_legs();
727        if circ_count != 1 {
728            return Err(
729                internal!("Cannot allow stream requests on tunnel with {circ_count} legs",).into(),
730            );
731        }
732
733        let time_prov = self.time_provider.clone();
734        let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
735        let (incoming_sender, incoming_receiver) =
736            MpscSpec::new(INCOMING_BUFFER).new_mq(time_prov, self.memquota.as_raw_account())?;
737        let (tx, rx) = oneshot::channel();
738
739        self.command
740            .unbounded_send(CtrlCmd::AwaitStreamRequest {
741                cmd_checker,
742                incoming_sender,
743                hop_num,
744                done: tx,
745                filter: Box::new(filter),
746            })
747            .map_err(|_| Error::CircuitClosed)?;
748
749        // Check whether the AwaitStreamRequest was processed successfully.
750        rx.await.map_err(|_| Error::CircuitClosed)??;
751
752        let allowed_hop_num = hop_num;
753
754        let circ = Arc::clone(self);
755        Ok(incoming_receiver.map(move |req_ctx| {
756            let StreamReqInfo {
757                req,
758                stream_id,
759                hop_num,
760                leg,
761                receiver,
762                msg_tx,
763                memquota,
764                relay_cell_format,
765            } = req_ctx;
766
767            // We already enforce this in handle_incoming_stream_request; this
768            // assertion is just here to make sure that we don't ever
769            // accidentally remove or fail to enforce that check, since it is
770            // security-critical.
771            assert_eq!(allowed_hop_num, hop_num);
772
773            // TODO(#2002): figure out what this is going to look like
774            // for onion services (perhaps we should forbid this function
775            // from being called on a multipath circuit?)
776            //
777            // See also:
778            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3002#note_3200937
779            let target = StreamTarget {
780                circ: Arc::clone(&circ),
781                tx: msg_tx,
782                hop: HopLocation::Hop((leg, hop_num)),
783                stream_id,
784                relay_cell_format,
785            };
786
787            let reader = StreamReader {
788                target: target.clone(),
789                receiver,
790                recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
791                ended: false,
792            };
793
794            IncomingStream::new(circ.time_provider.clone(), req, target, reader, memquota)
795        }))
796    }
797
798    /// Extend the circuit, via the most appropriate circuit extension handshake,
799    /// to the chosen `target` hop.
800    pub async fn extend<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
801    where
802        Tg: CircTarget,
803    {
804        // For now we use the simplest decision-making mechanism:
805        // we use ntor_v3 whenever it is present; and otherwise we use ntor.
806        //
807        // This behavior is slightly different from C tor, which uses ntor v3
808        // only whenever it want to send any extension in the circuit message.
809        // But thanks to congestion control (named::FLOWCTRL_CC), we'll _always_
810        // want to use an extension if we can, and so it doesn't make too much
811        // sense to detect the case where we have no extensions.
812        //
813        // (As of April 2025, RELAY_NTORV3 is not yet listed as Required for relays
814        // on the tor network, and so we cannot simply assume that everybody has it.)
815        if target
816            .protovers()
817            .supports_named_subver(named::RELAY_NTORV3)
818        {
819            self.extend_ntor_v3(target, params).await
820        } else {
821            self.extend_ntor(target, params).await
822        }
823    }
824
825    /// Extend the circuit via the ntor handshake to a new target last
826    /// hop.
827    pub async fn extend_ntor<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
828    where
829        Tg: CircTarget,
830    {
831        let key = NtorPublicKey {
832            id: *target
833                .rsa_identity()
834                .ok_or(Error::MissingId(RelayIdType::Rsa))?,
835            pk: *target.ntor_onion_key(),
836        };
837        let mut linkspecs = target
838            .linkspecs()
839            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
840        if !params.extend_by_ed25519_id {
841            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
842        }
843
844        let (tx, rx) = oneshot::channel();
845
846        let peer_id = OwnedChanTarget::from_chan_target(target);
847        let settings =
848            HopSettings::from_params_and_caps(&params, target.protovers())?.without_negotiation();
849        self.control
850            .unbounded_send(CtrlMsg::ExtendNtor {
851                peer_id,
852                public_key: key,
853                linkspecs,
854                settings,
855                done: tx,
856            })
857            .map_err(|_| Error::CircuitClosed)?;
858
859        rx.await.map_err(|_| Error::CircuitClosed)??;
860
861        Ok(())
862    }
863
864    /// Extend the circuit via the ntor handshake to a new target last
865    /// hop.
866    pub async fn extend_ntor_v3<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
867    where
868        Tg: CircTarget,
869    {
870        let key = NtorV3PublicKey {
871            id: *target
872                .ed_identity()
873                .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
874            pk: *target.ntor_onion_key(),
875        };
876        let mut linkspecs = target
877            .linkspecs()
878            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
879        if !params.extend_by_ed25519_id {
880            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
881        }
882
883        let (tx, rx) = oneshot::channel();
884
885        let peer_id = OwnedChanTarget::from_chan_target(target);
886        let settings = HopSettings::from_params_and_caps(&params, target.protovers())?;
887        self.control
888            .unbounded_send(CtrlMsg::ExtendNtorV3 {
889                peer_id,
890                public_key: key,
891                linkspecs,
892                settings,
893                done: tx,
894            })
895            .map_err(|_| Error::CircuitClosed)?;
896
897        rx.await.map_err(|_| Error::CircuitClosed)??;
898
899        Ok(())
900    }
901
902    /// Extend this circuit by a single, "virtual" hop.
903    ///
904    /// A virtual hop is one for which we do not add an actual network connection
905    /// between separate hosts (such as Relays).  We only add a layer of
906    /// cryptography.
907    ///
908    /// This is used to implement onion services: the client and the service
909    /// both build a circuit to a single rendezvous point, and tell the
910    /// rendezvous point to relay traffic between their two circuits.  Having
911    /// completed a [`handshake`] out of band[^1], the parties each extend their
912    /// circuits by a single "virtual" encryption hop that represents their
913    /// shared cryptographic context.
914    ///
915    /// Once a circuit has been extended in this way, it is an error to try to
916    /// extend it in any other way.
917    ///
918    /// [^1]: Technically, the handshake is only _mostly_ out of band: the
919    ///     client sends their half of the handshake in an ` message, and the
920    ///     service's response is inline in its `RENDEZVOUS2` message.
921    //
922    // TODO hs: let's try to enforce the "you can't extend a circuit again once
923    // it has been extended this way" property.  We could do that with internal
924    // state, or some kind of a type state pattern.
925    #[cfg(feature = "hs-common")]
926    pub async fn extend_virtual(
927        &self,
928        protocol: handshake::RelayProtocol,
929        role: handshake::HandshakeRole,
930        seed: impl handshake::KeyGenerator,
931        params: &CircParameters,
932        capabilities: &tor_protover::Protocols,
933    ) -> Result<()> {
934        use self::handshake::BoxedClientLayer;
935
936        let protocol = handshake::RelayCryptLayerProtocol::from(protocol);
937        let relay_cell_format = protocol.relay_cell_format();
938
939        let BoxedClientLayer { fwd, back, binding } =
940            protocol.construct_client_layers(role, seed)?;
941
942        let settings = HopSettings::from_params_and_caps(params, capabilities)?
943            // TODO #2037: We _should_ support negotiation here; see #2037.
944            .without_negotiation();
945        let (tx, rx) = oneshot::channel();
946        let message = CtrlCmd::ExtendVirtual {
947            relay_cell_format,
948            cell_crypto: (fwd, back, binding),
949            settings,
950            done: tx,
951        };
952
953        self.command
954            .unbounded_send(message)
955            .map_err(|_| Error::CircuitClosed)?;
956
957        rx.await.map_err(|_| Error::CircuitClosed)?
958    }
959
960    /// Helper, used to begin a stream.
961    ///
962    /// This function allocates a stream ID, and sends the message
963    /// (like a BEGIN or RESOLVE), but doesn't wait for a response.
964    ///
965    /// The caller will typically want to see the first cell in response,
966    /// to see whether it is e.g. an END or a CONNECTED.
967    async fn begin_stream_impl(
968        self: &Arc<ClientCirc>,
969        begin_msg: AnyRelayMsg,
970        cmd_checker: AnyCmdChecker,
971    ) -> Result<(StreamReader, StreamTarget, StreamAccount)> {
972        // TODO: Possibly this should take a hop, rather than just
973        // assuming it's the last hop.
974        let hop = TargetHop::LastHop;
975
976        let time_prov = self.time_provider.clone();
977
978        let memquota = StreamAccount::new(self.mq_account())?;
979        let (sender, receiver) = MpscSpec::new(STREAM_READER_BUFFER)
980            .new_mq(time_prov.clone(), memquota.as_raw_account())?;
981        let (tx, rx) = oneshot::channel();
982        let (msg_tx, msg_rx) =
983            MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(time_prov, memquota.as_raw_account())?;
984
985        self.control
986            .unbounded_send(CtrlMsg::BeginStream {
987                hop,
988                message: begin_msg,
989                sender,
990                rx: msg_rx,
991                done: tx,
992                cmd_checker,
993            })
994            .map_err(|_| Error::CircuitClosed)?;
995
996        let (stream_id, hop, relay_cell_format) = rx.await.map_err(|_| Error::CircuitClosed)??;
997
998        let target = StreamTarget {
999            circ: self.clone(),
1000            tx: msg_tx,
1001            hop,
1002            stream_id,
1003            relay_cell_format,
1004        };
1005
1006        let reader = StreamReader {
1007            target: target.clone(),
1008            receiver,
1009            recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
1010            ended: false,
1011        };
1012
1013        Ok((reader, target, memquota))
1014    }
1015
1016    /// Start a DataStream (anonymized connection) to the given
1017    /// address and port, using a BEGIN cell.
1018    async fn begin_data_stream(
1019        self: &Arc<ClientCirc>,
1020        msg: AnyRelayMsg,
1021        optimistic: bool,
1022    ) -> Result<DataStream> {
1023        let (reader, target, memquota) = self
1024            .begin_stream_impl(msg, DataCmdChecker::new_any())
1025            .await?;
1026        let mut stream = DataStream::new(self.time_provider.clone(), reader, target, memquota);
1027        if !optimistic {
1028            stream.wait_for_connection().await?;
1029        }
1030        Ok(stream)
1031    }
1032
1033    /// Start a stream to the given address and port, using a BEGIN
1034    /// cell.
1035    ///
1036    /// The use of a string for the address is intentional: you should let
1037    /// the remote Tor relay do the hostname lookup for you.
1038    pub async fn begin_stream(
1039        self: &Arc<ClientCirc>,
1040        target: &str,
1041        port: u16,
1042        parameters: Option<StreamParameters>,
1043    ) -> Result<DataStream> {
1044        let parameters = parameters.unwrap_or_default();
1045        let begin_flags = parameters.begin_flags();
1046        let optimistic = parameters.is_optimistic();
1047        let target = if parameters.suppressing_hostname() {
1048            ""
1049        } else {
1050            target
1051        };
1052        let beginmsg = Begin::new(target, port, begin_flags)
1053            .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
1054        self.begin_data_stream(beginmsg.into(), optimistic).await
1055    }
1056
1057    /// Start a new stream to the last relay in the circuit, using
1058    /// a BEGIN_DIR cell.
1059    pub async fn begin_dir_stream(self: Arc<ClientCirc>) -> Result<DataStream> {
1060        // Note that we always open begindir connections optimistically.
1061        // Since they are local to a relay that we've already authenticated
1062        // with and built a circuit to, there should be no additional checks
1063        // we need to perform to see whether the BEGINDIR will succeed.
1064        self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
1065            .await
1066    }
1067
1068    /// Perform a DNS lookup, using a RESOLVE cell with the last relay
1069    /// in this circuit.
1070    ///
1071    /// Note that this function does not check for timeouts; that's
1072    /// the caller's responsibility.
1073    pub async fn resolve(self: &Arc<ClientCirc>, hostname: &str) -> Result<Vec<IpAddr>> {
1074        let resolve_msg = Resolve::new(hostname);
1075
1076        let resolved_msg = self.try_resolve(resolve_msg).await?;
1077
1078        resolved_msg
1079            .into_answers()
1080            .into_iter()
1081            .filter_map(|(val, _)| match resolvedval_to_result(val) {
1082                Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
1083                Ok(_) => None,
1084                Err(e) => Some(Err(e)),
1085            })
1086            .collect()
1087    }
1088
1089    /// Perform a reverse DNS lookup, by sending a RESOLVE cell with
1090    /// the last relay on this circuit.
1091    ///
1092    /// Note that this function does not check for timeouts; that's
1093    /// the caller's responsibility.
1094    pub async fn resolve_ptr(self: &Arc<ClientCirc>, addr: IpAddr) -> Result<Vec<String>> {
1095        let resolve_ptr_msg = Resolve::new_reverse(&addr);
1096
1097        let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
1098
1099        resolved_msg
1100            .into_answers()
1101            .into_iter()
1102            .filter_map(|(val, _)| match resolvedval_to_result(val) {
1103                Ok(ResolvedVal::Hostname(v)) => Some(
1104                    String::from_utf8(v)
1105                        .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
1106                ),
1107                Ok(_) => None,
1108                Err(e) => Some(Err(e)),
1109            })
1110            .collect()
1111    }
1112
1113    /// Helper: Send the resolve message, and read resolved message from
1114    /// resolve stream.
1115    async fn try_resolve(self: &Arc<ClientCirc>, msg: Resolve) -> Result<Resolved> {
1116        let (reader, _target, memquota) = self
1117            .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
1118            .await?;
1119        let mut resolve_stream = ResolveStream::new(reader, memquota);
1120        resolve_stream.read_msg().await
1121    }
1122
1123    /// Shut down this circuit, along with all streams that are using it.
1124    /// Happens asynchronously (i.e. the circuit won't necessarily be done shutting down
1125    /// immediately after this function returns!).
1126    ///
1127    /// Note that other references to this circuit may exist.  If they
1128    /// do, they will stop working after you call this function.
1129    ///
1130    /// It's not necessary to call this method if you're just done
1131    /// with a circuit: the circuit should close on its own once nothing
1132    /// is using it any more.
1133    pub fn terminate(&self) {
1134        let _ = self.command.unbounded_send(CtrlCmd::Shutdown);
1135    }
1136
1137    /// Called when a circuit-level protocol error has occurred and the
1138    /// circuit needs to shut down.
1139    ///
1140    /// This is a separate function because we may eventually want to have
1141    /// it do more than just shut down.
1142    ///
1143    /// As with `terminate`, this function is asynchronous.
1144    pub(crate) fn protocol_error(&self) {
1145        self.terminate();
1146    }
1147
1148    /// Return true if this circuit is closed and therefore unusable.
1149    pub fn is_closing(&self) -> bool {
1150        self.control.is_closed()
1151    }
1152
1153    /// Return a process-unique identifier for this circuit.
1154    pub fn unique_id(&self) -> UniqId {
1155        self.unique_id
1156    }
1157
1158    /// Return the number of hops in this circuit.
1159    ///
1160    /// NOTE: This function will currently return only the number of hops
1161    /// _currently_ in the circuit. If there is an extend operation in progress,
1162    /// the currently pending hop may or may not be counted, depending on whether
1163    /// the extend operation finishes before this call is done.
1164    pub fn n_hops(&self) -> Result<usize> {
1165        self.mutable
1166            .n_hops(self.unique_id)
1167            .map_err(|_| Error::CircuitClosed)
1168    }
1169
1170    /// Return a future that will resolve once this circuit has closed.
1171    ///
1172    /// Note that this method does not itself cause the circuit to shut down.
1173    ///
1174    /// TODO: Perhaps this should return some kind of status indication instead
1175    /// of just ()
1176    #[cfg(feature = "experimental-api")]
1177    pub fn wait_for_close(&self) -> impl futures::Future<Output = ()> + Send + Sync + 'static {
1178        self.reactor_closed_rx.clone().map(|_| ())
1179    }
1180}
1181
1182/// Handle to use during an ongoing protocol exchange with a circuit's last hop
1183///
1184/// This is obtained from [`ClientCirc::start_conversation`],
1185/// and used to send messages to the last hop relay.
1186//
1187// TODO(conflux): this should use ClientTunnel, and it should be moved into
1188// the tunnel module.
1189#[cfg(feature = "send-control-msg")]
1190#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
1191pub struct Conversation<'r>(&'r ClientCirc);
1192
1193#[cfg(feature = "send-control-msg")]
1194#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
1195impl Conversation<'_> {
1196    /// Send a protocol message as part of an ad-hoc exchange
1197    ///
1198    /// Responses are handled by the `MsgHandler` set up
1199    /// when the `Conversation` was created.
1200    pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
1201        self.send_internal(Some(msg), None).await
1202    }
1203
1204    /// Send a `SendMsgAndInstallHandler` to the reactor and wait for the outcome
1205    ///
1206    /// The guts of `start_conversation` and `Conversation::send_msg`
1207    pub(crate) async fn send_internal(
1208        &self,
1209        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
1210        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
1211    ) -> Result<()> {
1212        let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
1213        let (sender, receiver) = oneshot::channel();
1214
1215        let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
1216            msg,
1217            handler,
1218            sender,
1219        };
1220        self.0
1221            .control
1222            .unbounded_send(ctrl_msg)
1223            .map_err(|_| Error::CircuitClosed)?;
1224
1225        receiver.await.map_err(|_| Error::CircuitClosed)?
1226    }
1227}
1228
1229impl PendingClientCirc {
1230    /// Instantiate a new circuit object: used from Channel::new_circ().
1231    ///
1232    /// Does not send a CREATE* cell on its own.
1233    ///
1234    ///
1235    pub(crate) fn new(
1236        id: CircId,
1237        channel: Arc<Channel>,
1238        createdreceiver: oneshot::Receiver<CreateResponse>,
1239        input: CircuitRxReceiver,
1240        unique_id: UniqId,
1241        runtime: DynTimeProvider,
1242        memquota: CircuitAccount,
1243    ) -> (PendingClientCirc, crate::tunnel::reactor::Reactor) {
1244        let time_provider = channel.time_provider().clone();
1245        let (reactor, control_tx, command_tx, reactor_closed_rx, mutable) =
1246            Reactor::new(channel, id, unique_id, input, runtime, memquota.clone());
1247
1248        let circuit = ClientCirc {
1249            mutable,
1250            unique_id,
1251            control: control_tx,
1252            command: command_tx,
1253            reactor_closed_rx: reactor_closed_rx.shared(),
1254            #[cfg(test)]
1255            circid: id,
1256            memquota,
1257            time_provider,
1258        };
1259
1260        let pending = PendingClientCirc {
1261            recvcreated: createdreceiver,
1262            circ: Arc::new(circuit),
1263        };
1264        (pending, reactor)
1265    }
1266
1267    /// Extract the process-unique identifier for this pending circuit.
1268    pub fn peek_unique_id(&self) -> UniqId {
1269        self.circ.unique_id
1270    }
1271
1272    /// Use the (questionable!) CREATE_FAST handshake to connect to the
1273    /// first hop of this circuit.
1274    ///
1275    /// There's no authentication in CRATE_FAST,
1276    /// so we don't need to know whom we're connecting to: we're just
1277    /// connecting to whichever relay the channel is for.
1278    pub async fn create_firsthop_fast(self, params: CircParameters) -> Result<Arc<ClientCirc>> {
1279        // We no nothing about this relay, so we assume it supports no protocol capabilities at all.
1280        //
1281        // TODO: If we had a consensus, we could assume it supported all required-relay-protocols.
1282        let protocols = tor_protover::Protocols::new();
1283        let settings =
1284            HopSettings::from_params_and_caps(&params, &protocols)?.without_negotiation();
1285
1286        let (tx, rx) = oneshot::channel();
1287        self.circ
1288            .control
1289            .unbounded_send(CtrlMsg::Create {
1290                recv_created: self.recvcreated,
1291                handshake: CircuitHandshake::CreateFast,
1292                settings,
1293                done: tx,
1294            })
1295            .map_err(|_| Error::CircuitClosed)?;
1296
1297        rx.await.map_err(|_| Error::CircuitClosed)??;
1298
1299        Ok(self.circ)
1300    }
1301
1302    /// Use the most appropriate handshake to connect to the first hop of this circuit.
1303    ///
1304    /// Note that the provided 'target' must match the channel's target,
1305    /// or the handshake will fail.
1306    pub async fn create_firsthop<Tg>(
1307        self,
1308        target: &Tg,
1309        params: CircParameters,
1310    ) -> Result<Arc<ClientCirc>>
1311    where
1312        Tg: tor_linkspec::CircTarget,
1313    {
1314        // (See note in ClientCirc::extend.)
1315        if target
1316            .protovers()
1317            .supports_named_subver(named::RELAY_NTORV3)
1318        {
1319            self.create_firsthop_ntor_v3(target, params).await
1320        } else {
1321            self.create_firsthop_ntor(target, params).await
1322        }
1323    }
1324
1325    /// Use the ntor handshake to connect to the first hop of this circuit.
1326    ///
1327    /// Note that the provided 'target' must match the channel's target,
1328    /// or the handshake will fail.
1329    pub async fn create_firsthop_ntor<Tg>(
1330        self,
1331        target: &Tg,
1332        params: CircParameters,
1333    ) -> Result<Arc<ClientCirc>>
1334    where
1335        Tg: tor_linkspec::CircTarget,
1336    {
1337        let (tx, rx) = oneshot::channel();
1338        let settings =
1339            HopSettings::from_params_and_caps(&params, target.protovers())?.without_negotiation();
1340
1341        self.circ
1342            .control
1343            .unbounded_send(CtrlMsg::Create {
1344                recv_created: self.recvcreated,
1345                handshake: CircuitHandshake::Ntor {
1346                    public_key: NtorPublicKey {
1347                        id: *target
1348                            .rsa_identity()
1349                            .ok_or(Error::MissingId(RelayIdType::Rsa))?,
1350                        pk: *target.ntor_onion_key(),
1351                    },
1352                    ed_identity: *target
1353                        .ed_identity()
1354                        .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
1355                },
1356                settings,
1357                done: tx,
1358            })
1359            .map_err(|_| Error::CircuitClosed)?;
1360
1361        rx.await.map_err(|_| Error::CircuitClosed)??;
1362
1363        Ok(self.circ)
1364    }
1365
1366    /// Use the ntor_v3 handshake to connect to the first hop of this circuit.
1367    ///
1368    /// Assumes that the target supports ntor_v3. The caller should verify
1369    /// this before calling this function, e.g. by validating that the target
1370    /// has advertised ["Relay=4"](https://spec.torproject.org/tor-spec/subprotocol-versioning.html#relay).
1371    ///
1372    /// Note that the provided 'target' must match the channel's target,
1373    /// or the handshake will fail.
1374    pub async fn create_firsthop_ntor_v3<Tg>(
1375        self,
1376        target: &Tg,
1377        params: CircParameters,
1378    ) -> Result<Arc<ClientCirc>>
1379    where
1380        Tg: tor_linkspec::CircTarget,
1381    {
1382        let settings = HopSettings::from_params_and_caps(&params, target.protovers())?;
1383        let (tx, rx) = oneshot::channel();
1384
1385        self.circ
1386            .control
1387            .unbounded_send(CtrlMsg::Create {
1388                recv_created: self.recvcreated,
1389                handshake: CircuitHandshake::NtorV3 {
1390                    public_key: NtorV3PublicKey {
1391                        id: *target
1392                            .ed_identity()
1393                            .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
1394                        pk: *target.ntor_onion_key(),
1395                    },
1396                },
1397                settings,
1398                done: tx,
1399            })
1400            .map_err(|_| Error::CircuitClosed)?;
1401
1402        rx.await.map_err(|_| Error::CircuitClosed)??;
1403
1404        Ok(self.circ)
1405    }
1406}
1407
1408/// Convert a [`ResolvedVal`] into a Result, based on whether or not
1409/// it represents an error.
1410fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
1411    match val {
1412        ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
1413        ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
1414        ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
1415        _ => Ok(val),
1416    }
1417}
1418
1419#[cfg(test)]
1420pub(crate) mod test {
1421    // @@ begin test lint list maintained by maint/add_warning @@
1422    #![allow(clippy::bool_assert_comparison)]
1423    #![allow(clippy::clone_on_copy)]
1424    #![allow(clippy::dbg_macro)]
1425    #![allow(clippy::mixed_attributes_style)]
1426    #![allow(clippy::print_stderr)]
1427    #![allow(clippy::print_stdout)]
1428    #![allow(clippy::single_char_pattern)]
1429    #![allow(clippy::unwrap_used)]
1430    #![allow(clippy::unchecked_duration_subtraction)]
1431    #![allow(clippy::useless_vec)]
1432    #![allow(clippy::needless_pass_by_value)]
1433    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1434
1435    use super::*;
1436    use crate::channel::OpenChanCellS2C;
1437    use crate::channel::{test::new_reactor, CodecError};
1438    use crate::congestion::test_utils::params::build_cc_vegas_params;
1439    use crate::crypto::cell::RelayCellBody;
1440    use crate::crypto::handshake::ntor_v3::NtorV3Server;
1441    #[cfg(feature = "hs-service")]
1442    use crate::stream::IncomingStreamRequestFilter;
1443    use chanmsg::{AnyChanMsg, Created2, CreatedFast};
1444    use futures::channel::mpsc::{Receiver, Sender};
1445    use futures::io::{AsyncReadExt, AsyncWriteExt};
1446    use futures::sink::SinkExt;
1447    use futures::stream::StreamExt;
1448    use futures::task::SpawnExt;
1449    use hex_literal::hex;
1450    use std::collections::{HashMap, VecDeque};
1451    use std::fmt::Debug;
1452    use std::time::Duration;
1453    use tor_basic_utils::test_rng::testing_rng;
1454    use tor_cell::chancell::{msg as chanmsg, AnyChanCell, BoxedCellBody, ChanCell, ChanCmd};
1455    use tor_cell::relaycell::extend::{self as extend_ext, CircRequestExt, CircResponseExt};
1456    use tor_cell::relaycell::msg::SendmeTag;
1457    use tor_cell::relaycell::{
1458        msg as relaymsg, AnyRelayMsgOuter, RelayCellFormat, RelayCmd, RelayMsg as _, StreamId,
1459    };
1460    use tor_linkspec::OwnedCircTarget;
1461    use tor_memquota::HasMemoryCost;
1462    use tor_rtcompat::Runtime;
1463    use tracing::trace;
1464    use tracing_test::traced_test;
1465
1466    #[cfg(feature = "conflux")]
1467    use {
1468        crate::tunnel::reactor::ConfluxHandshakeResult,
1469        crate::util::err::ConfluxHandshakeError,
1470        std::result::Result as StdResult,
1471        tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
1472        tor_cell::relaycell::msg::ConfluxLink,
1473        tor_rtmock::MockRuntime,
1474    };
1475
1476    impl PendingClientCirc {
1477        /// Testing only: Extract the circuit ID for this pending circuit.
1478        pub(crate) fn peek_circid(&self) -> CircId {
1479            self.circ.circid
1480        }
1481    }
1482
1483    impl ClientCirc {
1484        /// Testing only: Extract the circuit ID of this circuit.
1485        pub(crate) fn peek_circid(&self) -> CircId {
1486            self.circid
1487        }
1488    }
1489
1490    fn rmsg_to_ccmsg(id: Option<StreamId>, msg: relaymsg::AnyRelayMsg) -> ClientCircChanMsg {
1491        // TODO #1947: test other formats.
1492        let rfmt = RelayCellFormat::V0;
1493        let body: BoxedCellBody = AnyRelayMsgOuter::new(id, msg)
1494            .encode(rfmt, &mut testing_rng())
1495            .unwrap();
1496        let chanmsg = chanmsg::Relay::from(body);
1497        ClientCircChanMsg::Relay(chanmsg)
1498    }
1499
1500    // Example relay IDs and keys
1501    const EXAMPLE_SK: [u8; 32] =
1502        hex!("7789d92a89711a7e2874c61ea495452cfd48627b3ca2ea9546aafa5bf7b55803");
1503    const EXAMPLE_PK: [u8; 32] =
1504        hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253");
1505    const EXAMPLE_ED_ID: [u8; 32] = [6; 32];
1506    const EXAMPLE_RSA_ID: [u8; 20] = [10; 20];
1507
1508    /// Make an MPSC queue, of the type we use in Channels, but a fake one for testing
1509    #[cfg(test)]
1510    pub(crate) fn fake_mpsc<T: HasMemoryCost + Debug + Send>(
1511        buffer: usize,
1512    ) -> (StreamMpscSender<T>, StreamMpscReceiver<T>) {
1513        crate::fake_mpsc(buffer)
1514    }
1515
1516    /// return an example OwnedCircTarget that can get used for an ntor handshake.
1517    fn example_target() -> OwnedCircTarget {
1518        let mut builder = OwnedCircTarget::builder();
1519        builder
1520            .chan_target()
1521            .ed_identity(EXAMPLE_ED_ID.into())
1522            .rsa_identity(EXAMPLE_RSA_ID.into());
1523        builder
1524            .ntor_onion_key(EXAMPLE_PK.into())
1525            .protocols("FlowCtrl=1-2".parse().unwrap())
1526            .build()
1527            .unwrap()
1528    }
1529    fn example_ntor_key() -> crate::crypto::handshake::ntor::NtorSecretKey {
1530        crate::crypto::handshake::ntor::NtorSecretKey::new(
1531            EXAMPLE_SK.into(),
1532            EXAMPLE_PK.into(),
1533            EXAMPLE_RSA_ID.into(),
1534        )
1535    }
1536    fn example_ntor_v3_key() -> crate::crypto::handshake::ntor_v3::NtorV3SecretKey {
1537        crate::crypto::handshake::ntor_v3::NtorV3SecretKey::new(
1538            EXAMPLE_SK.into(),
1539            EXAMPLE_PK.into(),
1540            EXAMPLE_ED_ID.into(),
1541        )
1542    }
1543
1544    fn working_fake_channel<R: Runtime>(
1545        rt: &R,
1546    ) -> (
1547        Arc<Channel>,
1548        Receiver<AnyChanCell>,
1549        Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
1550    ) {
1551        let (channel, chan_reactor, rx, tx) = new_reactor(rt.clone());
1552        rt.spawn(async {
1553            let _ignore = chan_reactor.run().await;
1554        })
1555        .unwrap();
1556        (channel, rx, tx)
1557    }
1558
1559    /// Which handshake type to use.
1560    #[derive(Copy, Clone)]
1561    enum HandshakeType {
1562        Fast,
1563        Ntor,
1564        NtorV3,
1565    }
1566
1567    async fn test_create<R: Runtime>(rt: &R, handshake_type: HandshakeType, with_cc: bool) {
1568        // We want to try progressing from a pending circuit to a circuit
1569        // via a crate_fast handshake.
1570
1571        use crate::crypto::handshake::{fast::CreateFastServer, ntor::NtorServer, ServerHandshake};
1572
1573        let (chan, mut rx, _sink) = working_fake_channel(rt);
1574        let circid = CircId::new(128).unwrap();
1575        let (created_send, created_recv) = oneshot::channel();
1576        let (_circmsg_send, circmsg_recv) = fake_mpsc(64);
1577        let unique_id = UniqId::new(23, 17);
1578
1579        let (pending, reactor) = PendingClientCirc::new(
1580            circid,
1581            chan,
1582            created_recv,
1583            circmsg_recv,
1584            unique_id,
1585            DynTimeProvider::new(rt.clone()),
1586            CircuitAccount::new_noop(),
1587        );
1588
1589        rt.spawn(async {
1590            let _ignore = reactor.run().await;
1591        })
1592        .unwrap();
1593
1594        // Future to pretend to be a relay on the other end of the circuit.
1595        let simulate_relay_fut = async move {
1596            let mut rng = testing_rng();
1597            let create_cell = rx.next().await.unwrap();
1598            assert_eq!(create_cell.circid(), Some(circid));
1599            let reply = match handshake_type {
1600                HandshakeType::Fast => {
1601                    let cf = match create_cell.msg() {
1602                        AnyChanMsg::CreateFast(cf) => cf,
1603                        other => panic!("{:?}", other),
1604                    };
1605                    let (_, rep) = CreateFastServer::server(
1606                        &mut rng,
1607                        &mut |_: &()| Some(()),
1608                        &[()],
1609                        cf.handshake(),
1610                    )
1611                    .unwrap();
1612                    CreateResponse::CreatedFast(CreatedFast::new(rep))
1613                }
1614                HandshakeType::Ntor => {
1615                    let c2 = match create_cell.msg() {
1616                        AnyChanMsg::Create2(c2) => c2,
1617                        other => panic!("{:?}", other),
1618                    };
1619                    let (_, rep) = NtorServer::server(
1620                        &mut rng,
1621                        &mut |_: &()| Some(()),
1622                        &[example_ntor_key()],
1623                        c2.body(),
1624                    )
1625                    .unwrap();
1626                    CreateResponse::Created2(Created2::new(rep))
1627                }
1628                HandshakeType::NtorV3 => {
1629                    let c2 = match create_cell.msg() {
1630                        AnyChanMsg::Create2(c2) => c2,
1631                        other => panic!("{:?}", other),
1632                    };
1633                    let mut reply_fn = if with_cc {
1634                        |client_exts: &[CircRequestExt]| {
1635                            let _ = client_exts
1636                                .iter()
1637                                .find(|e| matches!(e, CircRequestExt::CcRequest(_)))
1638                                .expect("Client failed to request CC");
1639                            // This needs to be aligned to test_utils params
1640                            // value due to validation that needs it in range.
1641                            Some(vec![CircResponseExt::CcResponse(
1642                                extend_ext::CcResponse::new(31),
1643                            )])
1644                        }
1645                    } else {
1646                        |_: &_| Some(vec![])
1647                    };
1648                    let (_, rep) = NtorV3Server::server(
1649                        &mut rng,
1650                        &mut reply_fn,
1651                        &[example_ntor_v3_key()],
1652                        c2.body(),
1653                    )
1654                    .unwrap();
1655                    CreateResponse::Created2(Created2::new(rep))
1656                }
1657            };
1658            created_send.send(reply).unwrap();
1659        };
1660        // Future to pretend to be a client.
1661        let client_fut = async move {
1662            let target = example_target();
1663            let params = CircParameters::default();
1664            let ret = match handshake_type {
1665                HandshakeType::Fast => {
1666                    trace!("doing fast create");
1667                    pending.create_firsthop_fast(params).await
1668                }
1669                HandshakeType::Ntor => {
1670                    trace!("doing ntor create");
1671                    pending.create_firsthop_ntor(&target, params).await
1672                }
1673                HandshakeType::NtorV3 => {
1674                    let params = if with_cc {
1675                        // Setup CC vegas parameters.
1676                        CircParameters::new(true, build_cc_vegas_params())
1677                    } else {
1678                        params
1679                    };
1680                    trace!("doing ntor_v3 create");
1681                    pending.create_firsthop_ntor_v3(&target, params).await
1682                }
1683            };
1684            trace!("create done: result {:?}", ret);
1685            ret
1686        };
1687
1688        let (circ, _) = futures::join!(client_fut, simulate_relay_fut);
1689
1690        let _circ = circ.unwrap();
1691
1692        // pfew!  We've build a circuit!  Let's make sure it has one hop.
1693        assert_eq!(_circ.n_hops().unwrap(), 1);
1694    }
1695
1696    #[traced_test]
1697    #[test]
1698    fn test_create_fast() {
1699        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1700            test_create(&rt, HandshakeType::Fast, false).await;
1701        });
1702    }
1703    #[traced_test]
1704    #[test]
1705    fn test_create_ntor() {
1706        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1707            test_create(&rt, HandshakeType::Ntor, false).await;
1708        });
1709    }
1710    #[traced_test]
1711    #[test]
1712    fn test_create_ntor_v3() {
1713        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1714            test_create(&rt, HandshakeType::NtorV3, false).await;
1715        });
1716    }
1717    #[traced_test]
1718    #[test]
1719    #[cfg(feature = "flowctl-cc")]
1720    fn test_create_ntor_v3_with_cc() {
1721        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1722            test_create(&rt, HandshakeType::NtorV3, true).await;
1723        });
1724    }
1725
1726    // An encryption layer that doesn't do any crypto.   Can be used
1727    // as inbound or outbound, but not both at once.
1728    pub(crate) struct DummyCrypto {
1729        counter_tag: [u8; 20],
1730        counter: u32,
1731        lasthop: bool,
1732    }
1733    impl DummyCrypto {
1734        fn next_tag(&mut self) -> SendmeTag {
1735            #![allow(clippy::identity_op)]
1736            self.counter_tag[0] = ((self.counter >> 0) & 255) as u8;
1737            self.counter_tag[1] = ((self.counter >> 8) & 255) as u8;
1738            self.counter_tag[2] = ((self.counter >> 16) & 255) as u8;
1739            self.counter_tag[3] = ((self.counter >> 24) & 255) as u8;
1740            self.counter += 1;
1741            self.counter_tag.into()
1742        }
1743    }
1744
1745    impl crate::crypto::cell::OutboundClientLayer for DummyCrypto {
1746        fn originate_for(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
1747            self.next_tag()
1748        }
1749        fn encrypt_outbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
1750    }
1751    impl crate::crypto::cell::InboundClientLayer for DummyCrypto {
1752        fn decrypt_inbound(
1753            &mut self,
1754            _cmd: ChanCmd,
1755            _cell: &mut RelayCellBody,
1756        ) -> Option<SendmeTag> {
1757            if self.lasthop {
1758                Some(self.next_tag())
1759            } else {
1760                None
1761            }
1762        }
1763    }
1764    impl DummyCrypto {
1765        pub(crate) fn new(lasthop: bool) -> Self {
1766            DummyCrypto {
1767                counter_tag: [0; 20],
1768                counter: 0,
1769                lasthop,
1770            }
1771        }
1772    }
1773
1774    // Helper: set up a 3-hop circuit with no encryption, where the
1775    // next inbound message seems to come from hop next_msg_from
1776    async fn newcirc_ext<R: Runtime>(
1777        rt: &R,
1778        unique_id: UniqId,
1779        chan: Arc<Channel>,
1780        hops: Vec<path::HopDetail>,
1781        next_msg_from: HopNum,
1782        params: CircParameters,
1783    ) -> (Arc<ClientCirc>, CircuitRxSender) {
1784        let circid = CircId::new(128).unwrap();
1785        let (_created_send, created_recv) = oneshot::channel();
1786        let (circmsg_send, circmsg_recv) = fake_mpsc(64);
1787
1788        let (pending, reactor) = PendingClientCirc::new(
1789            circid,
1790            chan,
1791            created_recv,
1792            circmsg_recv,
1793            unique_id,
1794            DynTimeProvider::new(rt.clone()),
1795            CircuitAccount::new_noop(),
1796        );
1797
1798        rt.spawn(async {
1799            let _ignore = reactor.run().await;
1800        })
1801        .unwrap();
1802
1803        let PendingClientCirc {
1804            circ,
1805            recvcreated: _,
1806        } = pending;
1807
1808        // TODO #1067: Support other formats
1809        let relay_cell_format = RelayCellFormat::V0;
1810
1811        let last_hop_num = u8::try_from(hops.len() - 1).unwrap();
1812        for (idx, peer_id) in hops.into_iter().enumerate() {
1813            let (tx, rx) = oneshot::channel();
1814            let idx = idx as u8;
1815
1816            circ.command
1817                .unbounded_send(CtrlCmd::AddFakeHop {
1818                    relay_cell_format,
1819                    fwd_lasthop: idx == last_hop_num,
1820                    rev_lasthop: idx == u8::from(next_msg_from),
1821                    peer_id,
1822                    params: params.clone(),
1823                    done: tx,
1824                })
1825                .unwrap();
1826            rx.await.unwrap().unwrap();
1827        }
1828
1829        (circ, circmsg_send)
1830    }
1831
1832    // Helper: set up a 3-hop circuit with no encryption, where the
1833    // next inbound message seems to come from hop next_msg_from
1834    async fn newcirc<R: Runtime>(rt: &R, chan: Arc<Channel>) -> (Arc<ClientCirc>, CircuitRxSender) {
1835        let hops = std::iter::repeat_with(|| {
1836            let peer_id = tor_linkspec::OwnedChanTarget::builder()
1837                .ed_identity([4; 32].into())
1838                .rsa_identity([5; 20].into())
1839                .build()
1840                .expect("Could not construct fake hop");
1841
1842            path::HopDetail::Relay(peer_id)
1843        })
1844        .take(3)
1845        .collect();
1846
1847        let unique_id = UniqId::new(23, 17);
1848        newcirc_ext(
1849            rt,
1850            unique_id,
1851            chan,
1852            hops,
1853            2.into(),
1854            CircParameters::default(),
1855        )
1856        .await
1857    }
1858
1859    /// Create `n` distinct [`path::HopDetail`]s,
1860    /// with the specified `start_idx` for the dummy identities.
1861    fn hop_details(n: u8, start_idx: u8) -> Vec<path::HopDetail> {
1862        (0..n)
1863            .map(|idx| {
1864                let peer_id = tor_linkspec::OwnedChanTarget::builder()
1865                    .ed_identity([idx + start_idx; 32].into())
1866                    .rsa_identity([idx + start_idx + 1; 20].into())
1867                    .build()
1868                    .expect("Could not construct fake hop");
1869
1870                path::HopDetail::Relay(peer_id)
1871            })
1872            .collect()
1873    }
1874
1875    async fn test_extend<R: Runtime>(rt: &R, handshake_type: HandshakeType) {
1876        use crate::crypto::handshake::{ntor::NtorServer, ServerHandshake};
1877
1878        let (chan, mut rx, _sink) = working_fake_channel(rt);
1879        let (circ, mut sink) = newcirc(rt, chan).await;
1880        let circid = circ.peek_circid();
1881        let params = CircParameters::default();
1882
1883        let extend_fut = async move {
1884            let target = example_target();
1885            match handshake_type {
1886                HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1887                HandshakeType::Ntor => circ.extend_ntor(&target, params).await.unwrap(),
1888                HandshakeType::NtorV3 => circ.extend_ntor_v3(&target, params).await.unwrap(),
1889            };
1890            circ // gotta keep the circ alive, or the reactor would exit.
1891        };
1892        let reply_fut = async move {
1893            // We've disabled encryption on this circuit, so we can just
1894            // read the extend2 cell.
1895            let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1896            assert_eq!(id, Some(circid));
1897            let rmsg = match chmsg {
1898                AnyChanMsg::RelayEarly(r) => {
1899                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1900                        .unwrap()
1901                }
1902                other => panic!("{:?}", other),
1903            };
1904            let e2 = match rmsg.msg() {
1905                AnyRelayMsg::Extend2(e2) => e2,
1906                other => panic!("{:?}", other),
1907            };
1908            let mut rng = testing_rng();
1909            let reply = match handshake_type {
1910                HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1911                HandshakeType::Ntor => {
1912                    let (_keygen, reply) = NtorServer::server(
1913                        &mut rng,
1914                        &mut |_: &()| Some(()),
1915                        &[example_ntor_key()],
1916                        e2.handshake(),
1917                    )
1918                    .unwrap();
1919                    reply
1920                }
1921                HandshakeType::NtorV3 => {
1922                    let (_keygen, reply) = NtorV3Server::server(
1923                        &mut rng,
1924                        &mut |_: &[CircRequestExt]| Some(vec![]),
1925                        &[example_ntor_v3_key()],
1926                        e2.handshake(),
1927                    )
1928                    .unwrap();
1929                    reply
1930                }
1931            };
1932
1933            let extended2 = relaymsg::Extended2::new(reply).into();
1934            sink.send(rmsg_to_ccmsg(None, extended2)).await.unwrap();
1935            (sink, rx) // gotta keep the sink and receiver alive, or the reactor will exit.
1936        };
1937
1938        let (circ, (_sink, _rx)) = futures::join!(extend_fut, reply_fut);
1939
1940        // Did we really add another hop?
1941        assert_eq!(circ.n_hops().unwrap(), 4);
1942
1943        // Do the path accessors report a reasonable outcome?
1944        {
1945            let path = circ.path_ref().unwrap();
1946            let path = path
1947                .all_hops()
1948                .filter_map(|hop| match hop {
1949                    path::HopDetail::Relay(r) => Some(r),
1950                    #[cfg(feature = "hs-common")]
1951                    path::HopDetail::Virtual => None,
1952                })
1953                .collect::<Vec<_>>();
1954
1955            assert_eq!(path.len(), 4);
1956            use tor_linkspec::HasRelayIds;
1957            assert_eq!(path[3].ed_identity(), example_target().ed_identity());
1958            assert_ne!(path[0].ed_identity(), example_target().ed_identity());
1959        }
1960        {
1961            let path = circ.path_ref().unwrap();
1962            assert_eq!(path.n_hops(), 4);
1963            use tor_linkspec::HasRelayIds;
1964            assert_eq!(
1965                path.hops()[3].as_chan_target().unwrap().ed_identity(),
1966                example_target().ed_identity()
1967            );
1968            assert_ne!(
1969                path.hops()[0].as_chan_target().unwrap().ed_identity(),
1970                example_target().ed_identity()
1971            );
1972        }
1973    }
1974
1975    #[traced_test]
1976    #[test]
1977    fn test_extend_ntor() {
1978        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1979            test_extend(&rt, HandshakeType::Ntor).await;
1980        });
1981    }
1982
1983    #[traced_test]
1984    #[test]
1985    fn test_extend_ntor_v3() {
1986        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1987            test_extend(&rt, HandshakeType::NtorV3).await;
1988        });
1989    }
1990
1991    async fn bad_extend_test_impl<R: Runtime>(
1992        rt: &R,
1993        reply_hop: HopNum,
1994        bad_reply: ClientCircChanMsg,
1995    ) -> Error {
1996        let (chan, _rx, _sink) = working_fake_channel(rt);
1997        let hops = std::iter::repeat_with(|| {
1998            let peer_id = tor_linkspec::OwnedChanTarget::builder()
1999                .ed_identity([4; 32].into())
2000                .rsa_identity([5; 20].into())
2001                .build()
2002                .expect("Could not construct fake hop");
2003
2004            path::HopDetail::Relay(peer_id)
2005        })
2006        .take(3)
2007        .collect();
2008
2009        let unique_id = UniqId::new(23, 17);
2010        let (circ, mut sink) = newcirc_ext(
2011            rt,
2012            unique_id,
2013            chan,
2014            hops,
2015            reply_hop,
2016            CircParameters::default(),
2017        )
2018        .await;
2019        let params = CircParameters::default();
2020
2021        let target = example_target();
2022        #[allow(clippy::clone_on_copy)]
2023        let rtc = rt.clone();
2024        let sink_handle = rt
2025            .spawn_with_handle(async move {
2026                rtc.sleep(Duration::from_millis(100)).await;
2027                sink.send(bad_reply).await.unwrap();
2028                sink
2029            })
2030            .unwrap();
2031        let outcome = circ.extend_ntor(&target, params).await;
2032        let _sink = sink_handle.await;
2033
2034        assert_eq!(circ.n_hops().unwrap(), 3);
2035        assert!(outcome.is_err());
2036        outcome.unwrap_err()
2037    }
2038
2039    #[traced_test]
2040    #[test]
2041    fn bad_extend_wronghop() {
2042        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2043            let extended2 = relaymsg::Extended2::new(vec![]).into();
2044            let cc = rmsg_to_ccmsg(None, extended2);
2045
2046            let error = bad_extend_test_impl(&rt, 1.into(), cc).await;
2047            // This case shows up as a CircDestroy, since a message sent
2048            // from the wrong hop won't even be delivered to the extend
2049            // code's meta-handler.  Instead the unexpected message will cause
2050            // the circuit to get torn down.
2051            match error {
2052                Error::CircuitClosed => {}
2053                x => panic!("got other error: {}", x),
2054            }
2055        });
2056    }
2057
2058    #[traced_test]
2059    #[test]
2060    fn bad_extend_wrongtype() {
2061        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2062            let extended = relaymsg::Extended::new(vec![7; 200]).into();
2063            let cc = rmsg_to_ccmsg(None, extended);
2064
2065            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
2066            match error {
2067                Error::BytesErr {
2068                    err: tor_bytes::Error::InvalidMessage(_),
2069                    object: "extended2 message",
2070                } => {}
2071                other => panic!("{:?}", other),
2072            }
2073        });
2074    }
2075
2076    #[traced_test]
2077    #[test]
2078    fn bad_extend_destroy() {
2079        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2080            let cc = ClientCircChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
2081            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
2082            match error {
2083                Error::CircuitClosed => {}
2084                other => panic!("{:?}", other),
2085            }
2086        });
2087    }
2088
2089    #[traced_test]
2090    #[test]
2091    fn bad_extend_crypto() {
2092        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2093            let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
2094            let cc = rmsg_to_ccmsg(None, extended2);
2095            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
2096            assert!(matches!(error, Error::BadCircHandshakeAuth));
2097        });
2098    }
2099
2100    #[traced_test]
2101    #[test]
2102    fn begindir() {
2103        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2104            let (chan, mut rx, _sink) = working_fake_channel(&rt);
2105            let (circ, mut sink) = newcirc(&rt, chan).await;
2106            let circid = circ.peek_circid();
2107
2108            let begin_and_send_fut = async move {
2109                // Here we'll say we've got a circuit, and we want to
2110                // make a simple BEGINDIR request with it.
2111                let mut stream = circ.begin_dir_stream().await.unwrap();
2112                stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
2113                stream.flush().await.unwrap();
2114                let mut buf = [0_u8; 1024];
2115                let n = stream.read(&mut buf).await.unwrap();
2116                assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
2117                let n = stream.read(&mut buf).await.unwrap();
2118                assert_eq!(n, 0);
2119                stream
2120            };
2121            let reply_fut = async move {
2122                // We've disabled encryption on this circuit, so we can just
2123                // read the begindir cell.
2124                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2125                assert_eq!(id, Some(circid));
2126                let rmsg = match chmsg {
2127                    AnyChanMsg::Relay(r) => {
2128                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2129                            .unwrap()
2130                    }
2131                    other => panic!("{:?}", other),
2132                };
2133                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2134                assert!(matches!(rmsg, AnyRelayMsg::BeginDir(_)));
2135
2136                // Reply with a Connected cell to indicate success.
2137                let connected = relaymsg::Connected::new_empty().into();
2138                sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2139
2140                // Now read a DATA cell...
2141                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2142                assert_eq!(id, Some(circid));
2143                let rmsg = match chmsg {
2144                    AnyChanMsg::Relay(r) => {
2145                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2146                            .unwrap()
2147                    }
2148                    other => panic!("{:?}", other),
2149                };
2150                let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
2151                assert_eq!(streamid_2, streamid);
2152                if let AnyRelayMsg::Data(d) = rmsg {
2153                    assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
2154                } else {
2155                    panic!();
2156                }
2157
2158                // Write another data cell in reply!
2159                let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
2160                    .unwrap()
2161                    .into();
2162                sink.send(rmsg_to_ccmsg(streamid, data)).await.unwrap();
2163
2164                // Send an END cell to say that the conversation is over.
2165                let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
2166                sink.send(rmsg_to_ccmsg(streamid, end)).await.unwrap();
2167
2168                (rx, sink) // gotta keep these alive, or the reactor will exit.
2169            };
2170
2171            let (_stream, (_rx, _sink)) = futures::join!(begin_and_send_fut, reply_fut);
2172        });
2173    }
2174
2175    // Test: close a stream, either by dropping it or by calling AsyncWriteExt::close.
2176    fn close_stream_helper(by_drop: bool) {
2177        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2178            let (chan, mut rx, _sink) = working_fake_channel(&rt);
2179            let (circ, mut sink) = newcirc(&rt, chan).await;
2180
2181            let stream_fut = async move {
2182                let stream = circ
2183                    .begin_stream("www.example.com", 80, None)
2184                    .await
2185                    .unwrap();
2186
2187                let (r, mut w) = stream.split();
2188                if by_drop {
2189                    // Drop the writer and the reader, which should close the stream.
2190                    drop(r);
2191                    drop(w);
2192                    (None, circ) // make sure to keep the circuit alive
2193                } else {
2194                    // Call close on the writer, while keeping the reader alive.
2195                    w.close().await.unwrap();
2196                    (Some(r), circ)
2197                }
2198            };
2199            let handler_fut = async {
2200                // Read the BEGIN message.
2201                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2202                let rmsg = match msg {
2203                    AnyChanMsg::Relay(r) => {
2204                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2205                            .unwrap()
2206                    }
2207                    other => panic!("{:?}", other),
2208                };
2209                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2210                assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
2211
2212                // Reply with a CONNECTED.
2213                let connected =
2214                    relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
2215                sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2216
2217                // Expect an END.
2218                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2219                let rmsg = match msg {
2220                    AnyChanMsg::Relay(r) => {
2221                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2222                            .unwrap()
2223                    }
2224                    other => panic!("{:?}", other),
2225                };
2226                let (_, rmsg) = rmsg.into_streamid_and_msg();
2227                assert_eq!(rmsg.cmd(), RelayCmd::END);
2228
2229                (rx, sink) // keep these alive or the reactor will exit.
2230            };
2231
2232            let ((_opt_reader, _circ), (_rx, _sink)) = futures::join!(stream_fut, handler_fut);
2233        });
2234    }
2235
2236    #[traced_test]
2237    #[test]
2238    fn drop_stream() {
2239        close_stream_helper(true);
2240    }
2241
2242    #[traced_test]
2243    #[test]
2244    fn close_stream() {
2245        close_stream_helper(false);
2246    }
2247
2248    // Set up a circuit and stream that expects some incoming SENDMEs.
2249    async fn setup_incoming_sendme_case<R: Runtime>(
2250        rt: &R,
2251        n_to_send: usize,
2252    ) -> (
2253        Arc<ClientCirc>,
2254        DataStream,
2255        CircuitRxSender,
2256        Option<StreamId>,
2257        usize,
2258        Receiver<AnyChanCell>,
2259        Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
2260    ) {
2261        let (chan, mut rx, sink2) = working_fake_channel(rt);
2262        let (circ, mut sink) = newcirc(rt, chan).await;
2263        let circid = circ.peek_circid();
2264
2265        let begin_and_send_fut = {
2266            let circ = circ.clone();
2267            async move {
2268                // Take our circuit and make a stream on it.
2269                let mut stream = circ
2270                    .begin_stream("www.example.com", 443, None)
2271                    .await
2272                    .unwrap();
2273                let junk = [0_u8; 1024];
2274                let mut remaining = n_to_send;
2275                while remaining > 0 {
2276                    let n = std::cmp::min(remaining, junk.len());
2277                    stream.write_all(&junk[..n]).await.unwrap();
2278                    remaining -= n;
2279                }
2280                stream.flush().await.unwrap();
2281                stream
2282            }
2283        };
2284
2285        let receive_fut = async move {
2286            // Read the begin cell.
2287            let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2288            let rmsg = match chmsg {
2289                AnyChanMsg::Relay(r) => {
2290                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2291                        .unwrap()
2292                }
2293                other => panic!("{:?}", other),
2294            };
2295            let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2296            assert!(matches!(rmsg, AnyRelayMsg::Begin(_)));
2297            // Reply with a connected cell...
2298            let connected = relaymsg::Connected::new_empty().into();
2299            sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2300            // Now read bytes from the stream until we have them all.
2301            let mut bytes_received = 0_usize;
2302            let mut cells_received = 0_usize;
2303            while bytes_received < n_to_send {
2304                // Read a data cell, and remember how much we got.
2305                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2306                assert_eq!(id, Some(circid));
2307
2308                let rmsg = match chmsg {
2309                    AnyChanMsg::Relay(r) => {
2310                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2311                            .unwrap()
2312                    }
2313                    other => panic!("{:?}", other),
2314                };
2315                let (streamid2, rmsg) = rmsg.into_streamid_and_msg();
2316                assert_eq!(streamid2, streamid);
2317                if let AnyRelayMsg::Data(dat) = rmsg {
2318                    cells_received += 1;
2319                    bytes_received += dat.as_ref().len();
2320                } else {
2321                    panic!();
2322                }
2323            }
2324
2325            (sink, streamid, cells_received, rx)
2326        };
2327
2328        let (stream, (sink, streamid, cells_received, rx)) =
2329            futures::join!(begin_and_send_fut, receive_fut);
2330
2331        (circ, stream, sink, streamid, cells_received, rx, sink2)
2332    }
2333
2334    #[traced_test]
2335    #[test]
2336    fn accept_valid_sendme() {
2337        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2338            let (circ, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
2339                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2340
2341            assert_eq!(cells_received, 301);
2342
2343            // Make sure that the circuit is indeed expecting the right sendmes
2344            {
2345                let (tx, rx) = oneshot::channel();
2346                circ.command
2347                    .unbounded_send(CtrlCmd::QuerySendWindow {
2348                        hop: 2.into(),
2349                        done: tx,
2350                    })
2351                    .unwrap();
2352                let (window, tags) = rx.await.unwrap().unwrap();
2353                assert_eq!(window, 1000 - 301);
2354                assert_eq!(tags.len(), 3);
2355                // 100
2356                assert_eq!(
2357                    tags[0],
2358                    SendmeTag::from(hex!("6400000000000000000000000000000000000000"))
2359                );
2360                // 200
2361                assert_eq!(
2362                    tags[1],
2363                    SendmeTag::from(hex!("c800000000000000000000000000000000000000"))
2364                );
2365                // 300
2366                assert_eq!(
2367                    tags[2],
2368                    SendmeTag::from(hex!("2c01000000000000000000000000000000000000"))
2369                );
2370            }
2371
2372            let reply_with_sendme_fut = async move {
2373                // make and send a circuit-level sendme.
2374                let c_sendme =
2375                    relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000"))
2376                        .into();
2377                sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
2378
2379                // Make and send a stream-level sendme.
2380                let s_sendme = relaymsg::Sendme::new_empty().into();
2381                sink.send(rmsg_to_ccmsg(streamid, s_sendme)).await.unwrap();
2382
2383                sink
2384            };
2385
2386            let _sink = reply_with_sendme_fut.await;
2387
2388            rt.advance_until_stalled().await;
2389
2390            // Now make sure that the circuit is still happy, and its
2391            // window is updated.
2392            {
2393                let (tx, rx) = oneshot::channel();
2394                circ.command
2395                    .unbounded_send(CtrlCmd::QuerySendWindow {
2396                        hop: 2.into(),
2397                        done: tx,
2398                    })
2399                    .unwrap();
2400                let (window, _tags) = rx.await.unwrap().unwrap();
2401                assert_eq!(window, 1000 - 201);
2402            }
2403        });
2404    }
2405
2406    #[traced_test]
2407    #[test]
2408    fn invalid_circ_sendme() {
2409        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2410            // Same setup as accept_valid_sendme() test above but try giving
2411            // a sendme with the wrong tag.
2412
2413            let (circ, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
2414                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2415
2416            let reply_with_sendme_fut = async move {
2417                // make and send a circuit-level sendme with a bad tag.
2418                let c_sendme =
2419                    relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF"))
2420                        .into();
2421                sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
2422                sink
2423            };
2424
2425            let _sink = reply_with_sendme_fut.await;
2426
2427            // Check whether the reactor dies as a result of receiving invalid data.
2428            rt.advance_until_stalled().await;
2429            assert!(circ.is_closing());
2430        });
2431    }
2432
2433    #[traced_test]
2434    #[test]
2435    fn test_busy_stream_fairness() {
2436        // Number of streams to use.
2437        const N_STREAMS: usize = 3;
2438        // Number of cells (roughly) for each stream to send.
2439        const N_CELLS: usize = 20;
2440        // Number of bytes that *each* stream will send, and that we'll read
2441        // from the channel.
2442        const N_BYTES: usize = relaymsg::Data::MAXLEN_V0 * N_CELLS;
2443        // Ignoring cell granularity, with perfect fairness we'd expect
2444        // `N_BYTES/N_STREAMS` bytes from each stream.
2445        //
2446        // We currently allow for up to a full cell less than that.  This is
2447        // somewhat arbitrary and can be changed as needed, since we don't
2448        // provide any specific fairness guarantees.
2449        const MIN_EXPECTED_BYTES_PER_STREAM: usize =
2450            N_BYTES / N_STREAMS - relaymsg::Data::MAXLEN_V0;
2451
2452        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2453            let (chan, mut rx, _sink) = working_fake_channel(&rt);
2454            let (circ, mut sink) = newcirc(&rt, chan).await;
2455
2456            // Run clients in a single task, doing our own round-robin
2457            // scheduling of writes to the reactor. Conversely, if we were to
2458            // put each client in its own task, we would be at the the mercy of
2459            // how fairly the runtime schedules the client tasks, which is outside
2460            // the scope of this test.
2461            rt.spawn({
2462                // Clone the circuit to keep it alive after writers have
2463                // finished with it.
2464                let circ = circ.clone();
2465                async move {
2466                    let mut clients = VecDeque::new();
2467                    struct Client {
2468                        stream: DataStream,
2469                        to_write: &'static [u8],
2470                    }
2471                    for _ in 0..N_STREAMS {
2472                        clients.push_back(Client {
2473                            stream: circ
2474                                .begin_stream("www.example.com", 80, None)
2475                                .await
2476                                .unwrap(),
2477                            to_write: &[0_u8; N_BYTES][..],
2478                        });
2479                    }
2480                    while let Some(mut client) = clients.pop_front() {
2481                        if client.to_write.is_empty() {
2482                            // Client is done. Don't put back in queue.
2483                            continue;
2484                        }
2485                        let written = client.stream.write(client.to_write).await.unwrap();
2486                        client.to_write = &client.to_write[written..];
2487                        clients.push_back(client);
2488                    }
2489                }
2490            })
2491            .unwrap();
2492
2493            let channel_handler_fut = async {
2494                let mut stream_bytes_received = HashMap::<StreamId, usize>::new();
2495                let mut total_bytes_received = 0;
2496
2497                loop {
2498                    let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2499                    let rmsg = match msg {
2500                        AnyChanMsg::Relay(r) => AnyRelayMsgOuter::decode_singleton(
2501                            RelayCellFormat::V0,
2502                            r.into_relay_body(),
2503                        )
2504                        .unwrap(),
2505                        other => panic!("Unexpected chanmsg: {other:?}"),
2506                    };
2507                    let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2508                    match rmsg.cmd() {
2509                        RelayCmd::BEGIN => {
2510                            // Add an entry for this stream.
2511                            let prev = stream_bytes_received.insert(streamid.unwrap(), 0);
2512                            assert_eq!(prev, None);
2513                            // Reply with a CONNECTED.
2514                            let connected = relaymsg::Connected::new_with_addr(
2515                                "10.0.0.1".parse().unwrap(),
2516                                1234,
2517                            )
2518                            .into();
2519                            sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2520                        }
2521                        RelayCmd::DATA => {
2522                            let data_msg = relaymsg::Data::try_from(rmsg).unwrap();
2523                            let nbytes = data_msg.as_ref().len();
2524                            total_bytes_received += nbytes;
2525                            let streamid = streamid.unwrap();
2526                            let stream_bytes = stream_bytes_received.get_mut(&streamid).unwrap();
2527                            *stream_bytes += nbytes;
2528                            if total_bytes_received >= N_BYTES {
2529                                break;
2530                            }
2531                        }
2532                        RelayCmd::END => {
2533                            // Stream is done. If fair scheduling is working as
2534                            // expected we *probably* shouldn't get here, but we
2535                            // can ignore it and save the failure until we
2536                            // actually have the final stats.
2537                            continue;
2538                        }
2539                        other => {
2540                            panic!("Unexpected command {other:?}");
2541                        }
2542                    }
2543                }
2544
2545                // Return our stats, along with the `rx` and `sink` to keep the
2546                // reactor alive (since clients could still be writing).
2547                (total_bytes_received, stream_bytes_received, rx, sink)
2548            };
2549
2550            let (total_bytes_received, stream_bytes_received, _rx, _sink) =
2551                channel_handler_fut.await;
2552            assert_eq!(stream_bytes_received.len(), N_STREAMS);
2553            for (sid, stream_bytes) in stream_bytes_received {
2554                assert!(
2555                    stream_bytes >= MIN_EXPECTED_BYTES_PER_STREAM,
2556                    "Only {stream_bytes} of {total_bytes_received} bytes received from {N_STREAMS} came from {sid:?}; expected at least {MIN_EXPECTED_BYTES_PER_STREAM}"
2557                );
2558            }
2559        });
2560    }
2561
2562    #[test]
2563    fn basic_params() {
2564        use super::CircParameters;
2565        let mut p = CircParameters::default();
2566        assert!(p.extend_by_ed25519_id);
2567
2568        p.extend_by_ed25519_id = false;
2569        assert!(!p.extend_by_ed25519_id);
2570    }
2571
2572    #[cfg(feature = "hs-service")]
2573    struct AllowAllStreamsFilter;
2574    #[cfg(feature = "hs-service")]
2575    impl IncomingStreamRequestFilter for AllowAllStreamsFilter {
2576        fn disposition(
2577            &mut self,
2578            _ctx: &crate::stream::IncomingStreamRequestContext<'_>,
2579            _circ: &crate::tunnel::reactor::syncview::ClientCircSyncView<'_>,
2580        ) -> Result<crate::stream::IncomingStreamRequestDisposition> {
2581            Ok(crate::stream::IncomingStreamRequestDisposition::Accept)
2582        }
2583    }
2584
2585    #[traced_test]
2586    #[test]
2587    #[cfg(feature = "hs-service")]
2588    fn allow_stream_requests_twice() {
2589        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2590            let (chan, _rx, _sink) = working_fake_channel(&rt);
2591            let (circ, _send) = newcirc(&rt, chan).await;
2592
2593            let _incoming = circ
2594                .allow_stream_requests(
2595                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2596                    circ.last_hop_num().unwrap(),
2597                    AllowAllStreamsFilter,
2598                )
2599                .await
2600                .unwrap();
2601
2602            let incoming = circ
2603                .allow_stream_requests(
2604                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2605                    circ.last_hop_num().unwrap(),
2606                    AllowAllStreamsFilter,
2607                )
2608                .await;
2609
2610            // There can only be one IncomingStream at a time on any given circuit.
2611            assert!(incoming.is_err());
2612        });
2613    }
2614
2615    #[traced_test]
2616    #[test]
2617    #[cfg(feature = "hs-service")]
2618    fn allow_stream_requests() {
2619        use tor_cell::relaycell::msg::BeginFlags;
2620
2621        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2622            const TEST_DATA: &[u8] = b"ping";
2623
2624            let (chan, _rx, _sink) = working_fake_channel(&rt);
2625            let (circ, mut send) = newcirc(&rt, chan).await;
2626
2627            let rfmt = RelayCellFormat::V0;
2628
2629            // A helper channel for coordinating the "client"/"service" interaction
2630            let (tx, rx) = oneshot::channel();
2631            let mut incoming = circ
2632                .allow_stream_requests(
2633                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2634                    circ.last_hop_num().unwrap(),
2635                    AllowAllStreamsFilter,
2636                )
2637                .await
2638                .unwrap();
2639
2640            let simulate_service = async move {
2641                let stream = incoming.next().await.unwrap();
2642                let mut data_stream = stream
2643                    .accept_data(relaymsg::Connected::new_empty())
2644                    .await
2645                    .unwrap();
2646                // Notify the client task we're ready to accept DATA cells
2647                tx.send(()).unwrap();
2648
2649                // Read the data the client sent us
2650                let mut buf = [0_u8; TEST_DATA.len()];
2651                data_stream.read_exact(&mut buf).await.unwrap();
2652                assert_eq!(&buf, TEST_DATA);
2653
2654                circ
2655            };
2656
2657            let simulate_client = async move {
2658                let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2659                let body: BoxedCellBody =
2660                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2661                        .encode(rfmt, &mut testing_rng())
2662                        .unwrap();
2663                let begin_msg = chanmsg::Relay::from(body);
2664
2665                // Pretend to be a client at the other end of the circuit sending a begin cell
2666                send.send(ClientCircChanMsg::Relay(begin_msg))
2667                    .await
2668                    .unwrap();
2669
2670                // Wait until the service is ready to accept data
2671                // TODO: we shouldn't need to wait! This is needed because the service will reject
2672                // any DATA cells that aren't associated with a known stream. We need to wait until
2673                // the service receives our BEGIN cell (and the reactor updates hop.map with the
2674                // new stream).
2675                rx.await.unwrap();
2676                // Now send some data along the newly established circuit..
2677                let data = relaymsg::Data::new(TEST_DATA).unwrap();
2678                let body: BoxedCellBody =
2679                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2680                        .encode(rfmt, &mut testing_rng())
2681                        .unwrap();
2682                let data_msg = chanmsg::Relay::from(body);
2683
2684                send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
2685                send
2686            };
2687
2688            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2689        });
2690    }
2691
2692    #[traced_test]
2693    #[test]
2694    #[cfg(feature = "hs-service")]
2695    fn accept_stream_after_reject() {
2696        use tor_cell::relaycell::msg::BeginFlags;
2697        use tor_cell::relaycell::msg::EndReason;
2698
2699        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2700            const TEST_DATA: &[u8] = b"ping";
2701            const STREAM_COUNT: usize = 2;
2702            let rfmt = RelayCellFormat::V0;
2703
2704            let (chan, _rx, _sink) = working_fake_channel(&rt);
2705            let (circ, mut send) = newcirc(&rt, chan).await;
2706
2707            // A helper channel for coordinating the "client"/"service" interaction
2708            let (mut tx, mut rx) = mpsc::channel(STREAM_COUNT);
2709
2710            let mut incoming = circ
2711                .allow_stream_requests(
2712                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2713                    circ.last_hop_num().unwrap(),
2714                    AllowAllStreamsFilter,
2715                )
2716                .await
2717                .unwrap();
2718
2719            let simulate_service = async move {
2720                // Process 2 incoming streams
2721                for i in 0..STREAM_COUNT {
2722                    let stream = incoming.next().await.unwrap();
2723
2724                    // Reject the first one
2725                    if i == 0 {
2726                        stream
2727                            .reject(relaymsg::End::new_with_reason(EndReason::INTERNAL))
2728                            .await
2729                            .unwrap();
2730                        // Notify the client
2731                        tx.send(()).await.unwrap();
2732                        continue;
2733                    }
2734
2735                    let mut data_stream = stream
2736                        .accept_data(relaymsg::Connected::new_empty())
2737                        .await
2738                        .unwrap();
2739                    // Notify the client task we're ready to accept DATA cells
2740                    tx.send(()).await.unwrap();
2741
2742                    // Read the data the client sent us
2743                    let mut buf = [0_u8; TEST_DATA.len()];
2744                    data_stream.read_exact(&mut buf).await.unwrap();
2745                    assert_eq!(&buf, TEST_DATA);
2746                }
2747
2748                circ
2749            };
2750
2751            let simulate_client = async move {
2752                let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2753                let body: BoxedCellBody =
2754                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2755                        .encode(rfmt, &mut testing_rng())
2756                        .unwrap();
2757                let begin_msg = chanmsg::Relay::from(body);
2758
2759                // Pretend to be a client at the other end of the circuit sending 2 identical begin
2760                // cells (the first one will be rejected by the test service).
2761                for _ in 0..STREAM_COUNT {
2762                    send.send(ClientCircChanMsg::Relay(begin_msg.clone()))
2763                        .await
2764                        .unwrap();
2765
2766                    // Wait until the service rejects our request
2767                    rx.next().await.unwrap();
2768                }
2769
2770                // Now send some data along the newly established circuit..
2771                let data = relaymsg::Data::new(TEST_DATA).unwrap();
2772                let body: BoxedCellBody =
2773                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2774                        .encode(rfmt, &mut testing_rng())
2775                        .unwrap();
2776                let data_msg = chanmsg::Relay::from(body);
2777
2778                send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
2779                send
2780            };
2781
2782            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2783        });
2784    }
2785
2786    #[traced_test]
2787    #[test]
2788    #[cfg(feature = "hs-service")]
2789    fn incoming_stream_bad_hop() {
2790        use tor_cell::relaycell::msg::BeginFlags;
2791
2792        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2793            /// Expect the originator of the BEGIN cell to be hop 1.
2794            const EXPECTED_HOP: u8 = 1;
2795            let rfmt = RelayCellFormat::V0;
2796
2797            let (chan, _rx, _sink) = working_fake_channel(&rt);
2798            let (circ, mut send) = newcirc(&rt, chan).await;
2799
2800            // Expect to receive incoming streams from hop EXPECTED_HOP
2801            let mut incoming = circ
2802                .allow_stream_requests(
2803                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2804                    EXPECTED_HOP.into(),
2805                    AllowAllStreamsFilter,
2806                )
2807                .await
2808                .unwrap();
2809
2810            let simulate_service = async move {
2811                // The originator of the cell is actually the last hop on the circuit, not hop 1,
2812                // so we expect the reactor to shut down.
2813                assert!(incoming.next().await.is_none());
2814                circ
2815            };
2816
2817            let simulate_client = async move {
2818                let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2819                let body: BoxedCellBody =
2820                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2821                        .encode(rfmt, &mut testing_rng())
2822                        .unwrap();
2823                let begin_msg = chanmsg::Relay::from(body);
2824
2825                // Pretend to be a client at the other end of the circuit sending a begin cell
2826                send.send(ClientCircChanMsg::Relay(begin_msg))
2827                    .await
2828                    .unwrap();
2829
2830                send
2831            };
2832
2833            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2834        });
2835    }
2836
2837    #[traced_test]
2838    #[test]
2839    #[cfg(feature = "conflux")]
2840    fn multipath_circ_validation() {
2841        use std::error::Error as _;
2842
2843        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2844            let params = CircParameters::default();
2845            let invalid_tunnels = [
2846                setup_bad_conflux_tunnel(&rt).await,
2847                setup_conflux_tunnel(&rt, true, params).await,
2848            ];
2849
2850            for tunnel in invalid_tunnels {
2851                let TestTunnelCtx {
2852                    tunnel: _tunnel,
2853                    circs: _circs,
2854                    conflux_link_rx,
2855                } = tunnel;
2856
2857                let conflux_hs_err = conflux_link_rx.await.unwrap().unwrap_err();
2858                let err_src = conflux_hs_err.source().unwrap();
2859
2860                // The two circuits don't end in the same hop (no join point),
2861                // so the reactor will refuse to link them
2862                assert!(err_src
2863                    .to_string()
2864                    .contains("one more more conflux circuits are invalid"));
2865            }
2866        });
2867    }
2868
2869    // TODO: this structure could be reused for the other tests,
2870    // to address nickm's comment:
2871    // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3005#note_3202362
2872    #[derive(Debug)]
2873    #[allow(unused)]
2874    #[cfg(feature = "conflux")]
2875    struct TestCircuitCtx {
2876        chan_rx: Receiver<AnyChanCell>,
2877        chan_tx: Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
2878        circ_sink: CircuitRxSender,
2879    }
2880
2881    #[derive(Debug)]
2882    #[cfg(feature = "conflux")]
2883    struct TestTunnelCtx {
2884        tunnel: Arc<ClientCirc>,
2885        circs: Vec<TestCircuitCtx>,
2886        conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
2887    }
2888
2889    /// Wait for a LINK cell to arrive on the specified channel and return its payload.
2890    #[cfg(feature = "conflux")]
2891    async fn await_link_payload(rx: &mut Receiver<AnyChanCell>) -> ConfluxLink {
2892        // Wait for the LINK cell...
2893        let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2894        let rmsg = match chmsg {
2895            AnyChanMsg::Relay(r) => {
2896                AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2897                    .unwrap()
2898            }
2899            other => panic!("{:?}", other),
2900        };
2901        let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2902
2903        let link = match rmsg {
2904            AnyRelayMsg::ConfluxLink(link) => link,
2905            _ => panic!("unexpected relay message {rmsg:?}"),
2906        };
2907
2908        assert!(streamid.is_none());
2909
2910        link
2911    }
2912
2913    #[cfg(feature = "conflux")]
2914    async fn setup_conflux_tunnel(
2915        rt: &MockRuntime,
2916        same_hops: bool,
2917        params: CircParameters,
2918    ) -> TestTunnelCtx {
2919        let hops1 = hop_details(3, 0);
2920        let hops2 = if same_hops {
2921            hops1.clone()
2922        } else {
2923            hop_details(3, 10)
2924        };
2925
2926        let (chan1, rx1, chan_sink1) = working_fake_channel(rt);
2927        let (circ1, sink1) = newcirc_ext(
2928            rt,
2929            UniqId::new(1, 3),
2930            chan1,
2931            hops1,
2932            2.into(),
2933            params.clone(),
2934        )
2935        .await;
2936
2937        let (chan2, rx2, chan_sink2) = working_fake_channel(rt);
2938
2939        let (circ2, sink2) =
2940            newcirc_ext(rt, UniqId::new(2, 4), chan2, hops2, 2.into(), params).await;
2941
2942        let (answer_tx, answer_rx) = oneshot::channel();
2943        circ2
2944            .command
2945            .unbounded_send(CtrlCmd::ShutdownAndReturnCircuit { answer: answer_tx })
2946            .unwrap();
2947
2948        let circuit = answer_rx.await.unwrap().unwrap();
2949        // The circuit should be shutting down its reactor
2950        rt.advance_until_stalled().await;
2951        assert!(circ2.is_closing());
2952
2953        let (conflux_link_tx, conflux_link_rx) = oneshot::channel();
2954        // Tell the first circuit to link with the second and form a multipath tunnel
2955        circ1
2956            .control
2957            .unbounded_send(CtrlMsg::LinkCircuits {
2958                circuits: vec![circuit],
2959                answer: conflux_link_tx,
2960            })
2961            .unwrap();
2962
2963        let circ_ctx1 = TestCircuitCtx {
2964            chan_rx: rx1,
2965            chan_tx: chan_sink1,
2966            circ_sink: sink1,
2967        };
2968
2969        let circ_ctx2 = TestCircuitCtx {
2970            chan_rx: rx2,
2971            chan_tx: chan_sink2,
2972            circ_sink: sink2,
2973        };
2974
2975        TestTunnelCtx {
2976            tunnel: circ1,
2977            circs: vec![circ_ctx1, circ_ctx2],
2978            conflux_link_rx,
2979        }
2980    }
2981
2982    #[cfg(feature = "conflux")]
2983    async fn setup_good_conflux_tunnel(rt: &MockRuntime) -> TestTunnelCtx {
2984        // Our 2 test circuits are identical, so they both have the same guards,
2985        // which technically violates the conflux set rule mentioned in prop354.
2986        // For testing purposes this is fine, but in production we'll need to ensure
2987        // the calling code prevents guard reuse (except in the case where
2988        // one of the guards happens to be Guard + Exit)
2989        let same_hops = true;
2990        let params = CircParameters::new(true, build_cc_vegas_params());
2991        setup_conflux_tunnel(rt, same_hops, params).await
2992    }
2993
2994    #[cfg(feature = "conflux")]
2995    async fn setup_bad_conflux_tunnel(rt: &MockRuntime) -> TestTunnelCtx {
2996        // The two circuits don't share any hops,
2997        // so they won't end in the same hop (no join point),
2998        // causing the reactor to refuse to link them.
2999        let same_hops = false;
3000        let params = CircParameters::new(true, build_cc_vegas_params());
3001        setup_conflux_tunnel(rt, same_hops, params).await
3002    }
3003
3004    #[traced_test]
3005    #[test]
3006    #[cfg(feature = "conflux")]
3007    fn reject_conflux_linked_before_hs() {
3008        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3009            let (chan, mut _rx, _sink) = working_fake_channel(&rt);
3010            let (circ, mut sink) = newcirc(&rt, chan).await;
3011
3012            let nonce = V1Nonce::new(&mut testing_rng());
3013            let payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
3014            // Send a LINKED cell
3015            let linked = relaymsg::ConfluxLinked::new(payload).into();
3016            sink.send(rmsg_to_ccmsg(None, linked)).await.unwrap();
3017
3018            rt.advance_until_stalled().await;
3019            assert!(circ.is_closing());
3020        });
3021    }
3022
3023    #[traced_test]
3024    #[test]
3025    #[cfg(feature = "conflux")]
3026    fn conflux_hs_timeout() {
3027        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3028            let TestTunnelCtx {
3029                tunnel: _tunnel,
3030                circs,
3031                conflux_link_rx,
3032            } = setup_good_conflux_tunnel(&rt).await;
3033
3034            let [mut circ1, _circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3035
3036            // Wait for the LINK cell
3037            let link = await_link_payload(&mut circ1.chan_rx).await;
3038
3039            // Send a LINK cell on the first leg...
3040            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3041            circ1
3042                .circ_sink
3043                .send(rmsg_to_ccmsg(None, linked))
3044                .await
3045                .unwrap();
3046
3047            // Do nothing, and wait for the handshake to timeout on the second leg
3048            rt.advance_by(Duration::from_secs(60)).await;
3049
3050            let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3051
3052            // Get the handshake results of each circuit
3053            let [res1, res2]: [StdResult<(), ConfluxHandshakeError>; 2] =
3054                conflux_hs_res.try_into().unwrap();
3055
3056            assert!(res1.is_ok());
3057
3058            let err = res2.unwrap_err();
3059            assert!(matches!(err, ConfluxHandshakeError::Timeout), "{err:?}");
3060        });
3061    }
3062
3063    #[traced_test]
3064    #[test]
3065    #[cfg(feature = "conflux")]
3066    fn conflux_bad_hs() {
3067        use crate::util::err::ConfluxHandshakeError;
3068
3069        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3070            let nonce = V1Nonce::new(&mut testing_rng());
3071            let bad_link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
3072            //let extended2 = relaymsg::Extended2::new(vec![]).into();
3073            let bad_hs_responses = [
3074                (
3075                    rmsg_to_ccmsg(
3076                        None,
3077                        relaymsg::ConfluxLinked::new(bad_link_payload.clone()).into(),
3078                    ),
3079                    "Received CONFLUX_LINKED cell with mismatched nonce",
3080                ),
3081                (
3082                    rmsg_to_ccmsg(None, relaymsg::ConfluxLink::new(bad_link_payload).into()),
3083                    "Unexpected CONFLUX_LINK cell from hop #3 on client circuit",
3084                ),
3085                (
3086                    rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into()),
3087                    "Received CONFLUX_SWITCH on unlinked circuit?!",
3088                ),
3089                // TODO: this currently causes the reactor to shut down immediately,
3090                // without sending a response on the handshake channel
3091                /*
3092                (
3093                    rmsg_to_ccmsg(None, extended2),
3094                    "Received CONFLUX_LINKED cell with mismatched nonce",
3095                ),
3096                */
3097            ];
3098
3099            for (bad_cell, expected_err) in bad_hs_responses {
3100                let TestTunnelCtx {
3101                    tunnel,
3102                    circs,
3103                    conflux_link_rx,
3104                } = setup_good_conflux_tunnel(&rt).await;
3105
3106                let [mut _circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3107
3108                // Respond with a bogus cell on one of the legs
3109                circ2.circ_sink.send(bad_cell).await.unwrap();
3110
3111                let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3112                // Get the handshake results (the handshake results are reported early,
3113                // without waiting for the second circuit leg's handshake to timeout,
3114                // because this is a protocol violation causing the entire tunnel to shut down)
3115                let [res2]: [StdResult<(), ConfluxHandshakeError>; 1] =
3116                    conflux_hs_res.try_into().unwrap();
3117
3118                match res2.unwrap_err() {
3119                    ConfluxHandshakeError::Link(Error::CircProto(e)) => {
3120                        assert_eq!(e, expected_err);
3121                    }
3122                    e => panic!("unexpected error: {e:?}"),
3123                }
3124
3125                assert!(tunnel.is_closing());
3126            }
3127        });
3128    }
3129
3130    #[traced_test]
3131    #[test]
3132    #[cfg(feature = "conflux")]
3133    fn unexpected_conflux_cell() {
3134        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3135            let nonce = V1Nonce::new(&mut testing_rng());
3136            let link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
3137            let bad_cells = [
3138                rmsg_to_ccmsg(
3139                    None,
3140                    relaymsg::ConfluxLinked::new(link_payload.clone()).into(),
3141                ),
3142                rmsg_to_ccmsg(
3143                    None,
3144                    relaymsg::ConfluxLink::new(link_payload.clone()).into(),
3145                ),
3146                rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into()),
3147            ];
3148
3149            for bad_cell in bad_cells {
3150                let (chan, mut _rx, _sink) = working_fake_channel(&rt);
3151                let (circ, mut sink) = newcirc(&rt, chan).await;
3152
3153                sink.send(bad_cell).await.unwrap();
3154                rt.advance_until_stalled().await;
3155
3156                // Note: unfortunately we can't assert the circuit is
3157                // closing for the reason, because the reactor just logs
3158                // the error and then exits.
3159                assert!(circ.is_closing());
3160            }
3161        });
3162    }
3163
3164    #[traced_test]
3165    #[test]
3166    #[cfg(feature = "conflux")]
3167    fn conflux_bad_linked() {
3168        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3169            let TestTunnelCtx {
3170                tunnel,
3171                circs,
3172                conflux_link_rx: _,
3173            } = setup_good_conflux_tunnel(&rt).await;
3174
3175            let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3176
3177            let link = await_link_payload(&mut circ1.chan_rx).await;
3178
3179            // Send a LINK cell on the first leg...
3180            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3181            circ1
3182                .circ_sink
3183                .send(rmsg_to_ccmsg(None, linked))
3184                .await
3185                .unwrap();
3186
3187            // ...and two LINKED cells on the second
3188            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3189            circ2
3190                .circ_sink
3191                .send(rmsg_to_ccmsg(None, linked))
3192                .await
3193                .unwrap();
3194            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3195            circ2
3196                .circ_sink
3197                .send(rmsg_to_ccmsg(None, linked))
3198                .await
3199                .unwrap();
3200
3201            rt.advance_until_stalled().await;
3202
3203            // Receiving a LINKED cell on an already linked leg causes
3204            // the tunnel to be torn down
3205            assert!(tunnel.is_closing());
3206        });
3207    }
3208
3209    #[traced_test]
3210    #[test]
3211    #[cfg(feature = "conflux")]
3212    fn conflux_bad_switch() {
3213        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3214            let bad_switch = [
3215                // SWITCH cells with seqno = 0 are not allowed
3216                relaymsg::ConfluxSwitch::new(0),
3217                // TODO(#2031): from c-tor:
3218                //
3219                // We have to make sure that the switch command is truely
3220                // incrementing the sequence number, or else it becomes
3221                // a side channel that can be spammed for traffic analysis.
3222                //
3223                // We should figure out what this check is supposed to look like,
3224                // and have a test for it
3225            ];
3226
3227            for bad_cell in bad_switch {
3228                let TestTunnelCtx {
3229                    tunnel,
3230                    circs,
3231                    conflux_link_rx,
3232                } = setup_good_conflux_tunnel(&rt).await;
3233
3234                let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3235
3236                let link = await_link_payload(&mut circ1.chan_rx).await;
3237
3238                // Send a LINKED cell on both legs
3239                for circ in [&mut circ1, &mut circ2] {
3240                    let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3241                    circ.circ_sink
3242                        .send(rmsg_to_ccmsg(None, linked))
3243                        .await
3244                        .unwrap();
3245                }
3246
3247                let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3248                assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
3249
3250                // Now send a bad SWITCH cell on *both* legs.
3251                // This will cause both legs to be removed from the conflux set,
3252                // which causes the tunnel reactor to shut down
3253                for circ in [&mut circ1, &mut circ2] {
3254                    let msg = rmsg_to_ccmsg(None, bad_cell.clone().into());
3255                    circ.circ_sink.send(msg).await.unwrap();
3256                }
3257
3258                // The tunnel should be shutting down
3259                rt.advance_until_stalled().await;
3260                assert!(tunnel.is_closing());
3261            }
3262        });
3263    }
3264
3265    // TODO(conflux): add a test for SWITCH handling
3266
3267    /// Run a conflux test endpoint.
3268    #[cfg(feature = "conflux")]
3269    #[derive(Debug)]
3270    enum ConfluxTestEndpoint {
3271        /// Pretend to be an exit relay.
3272        Relay(ConfluxExitState),
3273        /// Client task.
3274        Client {
3275            /// Channel for receiving the outcome of the conflux handshakes.
3276            conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3277            /// The tunnel reactor handle
3278            tunnel: Arc<ClientCirc>,
3279            /// Data to send on a stream.
3280            stream_data: Vec<u8>,
3281        },
3282    }
3283
3284    /// Structure for returning the sinks, channels, etc. that must stay
3285    /// alive until the test is complete.
3286    #[allow(unused)]
3287    #[derive(Debug)]
3288    #[cfg(feature = "conflux")]
3289    enum ConfluxEndpointResult {
3290        Circuit(Arc<ClientCirc>),
3291        Relay {
3292            rx: Receiver<ChanCell<AnyChanMsg>>,
3293            sink: CircuitRxSender,
3294        },
3295    }
3296
3297    /// Stream data, shared by all the mock exit endpoints.
3298    #[derive(Debug)]
3299    #[cfg(feature = "conflux")]
3300    struct ConfluxStreamState {
3301        data_recvd: Vec<u8>,
3302        expected_data_len: usize,
3303        begin_recvd: bool,
3304        end_recvd: bool,
3305    }
3306
3307    #[derive(Debug)]
3308    #[cfg(feature = "conflux")]
3309    struct ConfluxExitState {
3310        runtime: MockRuntime,
3311        init_rtt_delay: Option<Duration>,
3312        rtt_delay: Option<Duration>,
3313        leg: usize,
3314        rx: Receiver<ChanCell<AnyChanMsg>>,
3315        sink: CircuitRxSender,
3316        stream_state: Arc<Mutex<ConfluxStreamState>>,
3317        expect_switch: Vec<usize>,
3318    }
3319
3320    #[cfg(feature = "conflux")]
3321    async fn good_exit_handshake(
3322        runtime: &MockRuntime,
3323        init_rtt_delay: Option<Duration>,
3324        rx: &mut Receiver<ChanCell<AnyChanMsg>>,
3325        sink: &mut CircuitRxSender,
3326    ) {
3327        // Wait for the LINK cell
3328        let link = await_link_payload(rx).await;
3329
3330        // Introduce an artificial delay, to make one circ have a better initial RTT
3331        // than the other
3332        if let Some(init_rtt_delay) = init_rtt_delay {
3333            runtime.advance_by(init_rtt_delay).await;
3334        }
3335
3336        // Reply with a LINKED cell...
3337        let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3338        sink.send(rmsg_to_ccmsg(None, linked)).await.unwrap();
3339
3340        // Wait for the client to respond with LINKED_ACK...
3341        let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
3342        let rmsg = match chmsg {
3343            AnyChanMsg::Relay(r) => {
3344                AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3345                    .unwrap()
3346            }
3347            other => panic!("{other:?}"),
3348        };
3349        let (_streamid, rmsg) = rmsg.into_streamid_and_msg();
3350
3351        assert!(matches!(rmsg, AnyRelayMsg::ConfluxLinkedAck(_)));
3352    }
3353
3354    #[cfg(feature = "conflux")]
3355    async fn run_mock_conflux_exit(state: ConfluxExitState) -> ConfluxEndpointResult {
3356        let ConfluxExitState {
3357            runtime,
3358            init_rtt_delay,
3359            rtt_delay,
3360            leg,
3361            mut rx,
3362            mut sink,
3363            stream_state,
3364            mut expect_switch,
3365        } = state;
3366
3367        // Do the conflux handshake
3368        good_exit_handshake(&runtime, init_rtt_delay, &mut rx, &mut sink).await;
3369
3370        // Expect the client to open a stream, and de-multiplex the received stream data
3371        let stream_len = stream_state.lock().unwrap().expected_data_len;
3372        let mut data_cells_received = 0_usize;
3373        let mut cell_count = 0_usize;
3374        while stream_state.lock().unwrap().data_recvd.len() < stream_len {
3375            // Wait for the BEGIN cell to arrive...
3376            let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
3377            cell_count += 1;
3378            let rmsg = match chmsg {
3379                AnyChanMsg::Relay(r) => {
3380                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3381                        .unwrap()
3382                }
3383                other => panic!("{:?}", other),
3384            };
3385            let (streamid, rmsg) = rmsg.into_streamid_and_msg();
3386
3387            let begin_recvd = stream_state.lock().unwrap().begin_recvd;
3388            let end_recvd = stream_state.lock().unwrap().end_recvd;
3389            match rmsg {
3390                AnyRelayMsg::Begin(_) if begin_recvd => {
3391                    panic!("client tried to open two streams?!");
3392                }
3393                AnyRelayMsg::Begin(_) if !begin_recvd => {
3394                    stream_state.lock().unwrap().begin_recvd = true;
3395                    // Reply with a connected cell...
3396                    let connected = relaymsg::Connected::new_empty().into();
3397                    sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
3398                }
3399                AnyRelayMsg::End(_) if !end_recvd => {
3400                    stream_state.lock().unwrap().end_recvd = true;
3401                    break;
3402                }
3403                AnyRelayMsg::End(_) if end_recvd => {
3404                    panic!("received two END cells for the same stream?!");
3405                }
3406                AnyRelayMsg::ConfluxSwitch(_) => {
3407                    // Ensure we got the SWITCH after the expected number of cells
3408                    let cells_until_switch = expect_switch.remove(0);
3409
3410                    assert_eq!(cells_until_switch, cell_count);
3411
3412                    // To keep the tests simple, we don't handle out of order cells,
3413                    // and simply sort the received data at the end.
3414                    // This ensures all the data was actually received,
3415                    // but it doesn't actually test that the SWITCH cells
3416                    // contain the appropriate seqnos.
3417                    continue;
3418                }
3419                AnyRelayMsg::Data(dat) => {
3420                    data_cells_received += 1;
3421
3422                    stream_state
3423                        .lock()
3424                        .unwrap()
3425                        .data_recvd
3426                        .extend_from_slice(dat.as_ref());
3427                }
3428                _ => panic!("unexpected message {rmsg:?} on leg {leg}"),
3429            }
3430
3431            if data_cells_received == 100 {
3432                // Introduce an artificial delay, to make one circ have worse RTT
3433                // than the other, and thus trigger a SWITCH
3434                if let Some(rtt_delay) = rtt_delay {
3435                    runtime.advance_by(rtt_delay).await;
3436                }
3437
3438                // Make and send a circuit-level SENDME
3439                let sendme =
3440                    relaymsg::Sendme::new_tag(hex!("2100000000000000000000000000000000000000"))
3441                        .into();
3442                sink.send(rmsg_to_ccmsg(None, sendme)).await.unwrap();
3443            }
3444        }
3445
3446        ConfluxEndpointResult::Relay { rx, sink }
3447    }
3448
3449    #[cfg(feature = "conflux")]
3450    async fn run_conflux_client(
3451        tunnel: Arc<ClientCirc>,
3452        conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3453        stream_data: Vec<u8>,
3454    ) -> ConfluxEndpointResult {
3455        let res = conflux_link_rx.await;
3456
3457        let res = res.unwrap().unwrap();
3458        assert_eq!(res.len(), 2);
3459
3460        // All circuit legs have completed the conflux handshake,
3461        // so we now have a multipath tunnel
3462
3463        // Now we're ready to open a stream
3464        let mut stream = tunnel
3465            .begin_stream("www.example.com", 443, None)
3466            .await
3467            .unwrap();
3468
3469        stream.write_all(&stream_data).await.unwrap();
3470        stream.flush().await.unwrap();
3471
3472        ConfluxEndpointResult::Circuit(tunnel)
3473    }
3474
3475    #[cfg(feature = "conflux")]
3476    async fn run_conflux_endpoint(endpoint: ConfluxTestEndpoint) -> ConfluxEndpointResult {
3477        match endpoint {
3478            ConfluxTestEndpoint::Relay(state) => run_mock_conflux_exit(state).await,
3479            ConfluxTestEndpoint::Client {
3480                tunnel,
3481                conflux_link_rx,
3482                stream_data,
3483            } => run_conflux_client(tunnel, conflux_link_rx, stream_data).await,
3484        }
3485    }
3486
3487    #[traced_test]
3488    #[test]
3489    #[cfg(feature = "conflux")]
3490    fn multipath_stream() {
3491        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3492            let TestTunnelCtx {
3493                tunnel,
3494                circs,
3495                conflux_link_rx,
3496            } = setup_good_conflux_tunnel(&rt).await;
3497            let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3498
3499            // The stream data we're going to send over the conflux tunnel
3500            let stream_data = (0..255_u8).cycle().take(300 * 498).collect::<Vec<_>>();
3501            let stream_state = Arc::new(Mutex::new(ConfluxStreamState {
3502                data_recvd: vec![],
3503                expected_data_len: stream_data.len(),
3504                begin_recvd: false,
3505                end_recvd: false,
3506            }));
3507
3508            let mut tasks = vec![];
3509            // Note: we can't have two advance_by() calls running
3510            // at the same time (it's a limitation of MockRuntime),
3511            // so we need to be careful to not cause concurrent delays
3512            // on the two circuits.
3513            let relays = [
3514                (
3515                    circ1.chan_rx,
3516                    circ1.circ_sink,
3517                    // We expect the client to start sending on the leg with no initial RTT delay,
3518                    // and then switch to the one with the lower overall RTT
3519                    vec![2],
3520                    None,
3521                    Some(Duration::from_millis(300)),
3522                ),
3523                (
3524                    circ2.chan_rx,
3525                    circ2.circ_sink,
3526                    vec![1],
3527                    Some(Duration::from_millis(200)),
3528                    None,
3529                ),
3530            ];
3531
3532            for (leg, (rx, sink, expect_switch, init_rtt_delay, rtt_delay)) in
3533                relays.into_iter().enumerate()
3534            {
3535                let relay = ConfluxTestEndpoint::Relay(ConfluxExitState {
3536                    runtime: rt.clone(),
3537                    leg,
3538                    init_rtt_delay,
3539                    rtt_delay,
3540                    rx,
3541                    sink,
3542                    stream_state: Arc::clone(&stream_state),
3543                    expect_switch,
3544                });
3545
3546                tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
3547            }
3548
3549            tasks.push(rt.spawn_join(
3550                "client task".to_string(),
3551                run_conflux_endpoint(ConfluxTestEndpoint::Client {
3552                    tunnel,
3553                    conflux_link_rx,
3554                    stream_data: stream_data.clone(),
3555                }),
3556            ));
3557            let _sinks = futures::future::join_all(tasks).await;
3558
3559            let stream_state = stream_state.lock().unwrap();
3560            assert!(stream_state.begin_recvd);
3561            assert!(stream_state.end_recvd);
3562
3563            // TODO: sort stream_data to work around the lack of handling of
3564            // out-of-order cells at the mock exit
3565            assert_eq!(stream_state.data_recvd, stream_data);
3566        });
3567    }
3568
3569    #[traced_test]
3570    #[test]
3571    #[cfg(all(feature = "conflux", feature = "hs-service"))]
3572    fn conflux_incoming_stream() {
3573        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3574            use std::error::Error as _;
3575
3576            const EXPECTED_HOP: u8 = 1;
3577
3578            let TestTunnelCtx {
3579                tunnel,
3580                circs,
3581                conflux_link_rx,
3582            } = setup_good_conflux_tunnel(&rt).await;
3583
3584            let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3585
3586            let link = await_link_payload(&mut circ1.chan_rx).await;
3587            for circ in [&mut circ1, &mut circ2] {
3588                let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3589                circ.circ_sink
3590                    .send(rmsg_to_ccmsg(None, linked))
3591                    .await
3592                    .unwrap();
3593            }
3594
3595            let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3596            assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
3597
3598            // TODO(#2002): we don't currently support conflux for onion services
3599            let err = tunnel
3600                .allow_stream_requests(
3601                    &[tor_cell::relaycell::RelayCmd::BEGIN],
3602                    EXPECTED_HOP.into(),
3603                    AllowAllStreamsFilter,
3604                )
3605                .await
3606                // IncomingStream doesn't impl Debug, so we need to map to a different type
3607                .map(|_| ())
3608                .unwrap_err();
3609
3610            let err_src = err.source().unwrap();
3611            assert!(err_src
3612                .to_string()
3613                .contains("Cannot allow stream requests on tunnel with 2 legs"));
3614        });
3615    }
3616}