1
//! Multi-hop paths over the Tor network.
2
//!
3
//! Right now, we only implement "client circuits" -- also sometimes
4
//! called "origin circuits".  A client circuit is one that is
5
//! constructed by this Tor instance, and used in its own behalf to
6
//! send data over the Tor network.
7
//!
8
//! Each circuit has multiple hops over the Tor network: each hop
9
//! knows only the hop before and the hop after.  The client shares a
10
//! separate set of keys with each hop.
11
//!
12
//! To build a circuit, first create a [crate::channel::Channel], then
13
//! call its [crate::channel::Channel::new_circ] method.  This yields
14
//! a [PendingClientCirc] object that won't become live until you call
15
//! one of the methods
16
//! (typically [`PendingClientCirc::create_firsthop`])
17
//! that extends it to its first hop.  After you've
18
//! done that, you can call [`ClientCirc::extend`] on the circuit to
19
//! build it into a multi-hop circuit.  Finally, you can use
20
//! [ClientCirc::begin_stream] to get a Stream object that can be used
21
//! for anonymized data.
22
//!
23
//! # Implementation
24
//!
25
//! Each open circuit has a corresponding Reactor object that runs in
26
//! an asynchronous task, and manages incoming cells from the
27
//! circuit's upstream channel.  These cells are either RELAY cells or
28
//! DESTROY cells.  DESTROY cells are handled immediately.
29
//! RELAY cells are either for a particular stream, in which case they
30
//! get forwarded to a RawCellStream object, or for no particular stream,
31
//! in which case they are considered "meta" cells (like EXTENDED2)
32
//! that should only get accepted if something is waiting for them.
33
//!
34
//! # Limitations
35
//!
36
//! This is client-only.
37

            
38
pub(crate) mod celltypes;
39
pub(crate) mod halfcirc;
40

            
41
#[cfg(feature = "hs-common")]
42
pub mod handshake;
43
#[cfg(not(feature = "hs-common"))]
44
pub(crate) mod handshake;
45

            
46
pub(super) mod path;
47
pub(crate) mod unique_id;
48

            
49
use crate::channel::Channel;
50
use crate::congestion::params::CongestionControlParams;
51
use crate::crypto::cell::HopNum;
52
use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
53
use crate::memquota::{CircuitAccount, SpecificAccount as _};
54
use crate::stream::{
55
    AnyCmdChecker, DataCmdChecker, DataStream, ResolveCmdChecker, ResolveStream, StreamParameters,
56
    StreamReader,
57
};
58
use crate::tunnel::circuit::celltypes::*;
59
use crate::tunnel::reactor::CtrlCmd;
60
use crate::tunnel::reactor::{
61
    CircuitHandshake, CtrlMsg, Reactor, RECV_WINDOW_INIT, STREAM_READER_BUFFER,
62
};
63
use crate::tunnel::{StreamTarget, TargetHop};
64
use crate::util::skew::ClockSkew;
65
use crate::{Error, ResolveError, Result};
66
use educe::Educe;
67
use path::HopDetail;
68
use tor_cell::{
69
    chancell::CircId,
70
    relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal},
71
};
72

            
73
use tor_error::{bad_api_usage, internal, into_internal};
74
use tor_linkspec::{CircTarget, LinkSpecType, OwnedChanTarget, RelayIdType};
75
use tor_protover::named;
76

            
77
pub use crate::crypto::binding::CircuitBinding;
78
pub use crate::memquota::StreamAccount;
79
pub use crate::tunnel::circuit::unique_id::UniqId;
80

            
81
#[cfg(feature = "hs-service")]
82
use {
83
    crate::stream::{IncomingCmdChecker, IncomingStream},
84
    crate::tunnel::reactor::StreamReqInfo,
85
};
86

            
87
use futures::channel::mpsc;
88
use oneshot_fused_workaround as oneshot;
89

            
90
use crate::congestion::sendme::StreamRecvWindow;
91
use crate::DynTimeProvider;
92
use futures::FutureExt as _;
93
use std::collections::HashMap;
94
use std::net::IpAddr;
95
use std::sync::{Arc, Mutex};
96
use tor_memquota::mq_queue::{self, ChannelSpec as _, MpscSpec};
97

            
98
use crate::crypto::handshake::ntor::NtorPublicKey;
99

            
100
pub use path::{Path, PathEntry};
101

            
102
/// The size of the buffer for communication between `ClientCirc` and its reactor.
103
pub const CIRCUIT_BUFFER_SIZE: usize = 128;
104

            
105
#[cfg(feature = "send-control-msg")]
106
use {crate::tunnel::msghandler::UserMsgHandler, crate::tunnel::reactor::MetaCellHandler};
107

            
108
pub use crate::tunnel::reactor::syncview::ClientCircSyncView;
109
#[cfg(feature = "send-control-msg")]
110
#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
111
pub use {crate::tunnel::msghandler::MsgHandler, crate::tunnel::reactor::MetaCellDisposition};
112

            
113
/// MPSC queue relating to a stream (either inbound or outbound), sender
114
pub(crate) type StreamMpscSender<T> = mq_queue::Sender<T, MpscSpec>;
115
/// MPSC queue relating to a stream (either inbound or outbound), receiver
116
pub(crate) type StreamMpscReceiver<T> = mq_queue::Receiver<T, MpscSpec>;
117

            
118
/// MPSC queue for inbound data on its way from channel to circuit, sender
119
pub(crate) type CircuitRxSender = mq_queue::Sender<ClientCircChanMsg, MpscSpec>;
120
/// MPSC queue for inbound data on its way from channel to circuit, receiver
121
pub(crate) type CircuitRxReceiver = mq_queue::Receiver<ClientCircChanMsg, MpscSpec>;
122

            
123
#[derive(Debug)]
124
/// A circuit that we have constructed over the Tor network.
125
///
126
/// # Circuit life cycle
127
///
128
/// `ClientCirc`s are created in an initially unusable state using [`Channel::new_circ`],
129
/// which returns a [`PendingClientCirc`].  To get a real (one-hop) circuit from
130
/// one of these, you invoke one of its `create_firsthop` methods (typically
131
/// [`create_firsthop_fast()`](PendingClientCirc::create_firsthop_fast) or
132
/// [`create_firsthop()`](PendingClientCirc::create_firsthop)).
133
/// Then, to add more hops to the circuit, you can call
134
/// [`extend()`](ClientCirc::extend) on it.
135
///
136
/// For higher-level APIs, see the `tor-circmgr` crate: the ones here in
137
/// `tor-proto` are probably not what you need.
138
///
139
/// After a circuit is created, it will persist until it is closed in one of
140
/// five ways:
141
///    1. A remote error occurs.
142
///    2. Some hop on the circuit sends a `DESTROY` message to tear down the
143
///       circuit.
144
///    3. The circuit's channel is closed.
145
///    4. Someone calls [`ClientCirc::terminate`] on the circuit.
146
///    5. The last reference to the `ClientCirc` is dropped. (Note that every stream
147
///       on a `ClientCirc` keeps a reference to it, which will in turn keep the
148
///       circuit from closing until all those streams have gone away.)
149
///
150
/// Note that in cases 1-4 the [`ClientCirc`] object itself will still exist: it
151
/// will just be unusable for most purposes.  Most operations on it will fail
152
/// with an error.
153
//
154
// Effectively, this struct contains two Arcs: one for `path` and one for
155
// `control` (which surely has something Arc-like in it).  We cannot unify
156
// these by putting a single Arc around the whole struct, and passing
157
// an Arc strong reference to the `Reactor`, because then `control` would
158
// not be dropped when the last user of the circuit goes away.  We could
159
// make the reactor have a weak reference but weak references are more
160
// expensive to dereference.
161
//
162
// Because of the above, cloning this struct is always going to involve
163
// two atomic refcount changes/checks.  Wrapping it in another Arc would
164
// be overkill.
165
//
166
pub struct ClientCirc {
167
    /// Mutable state shared with the `Reactor`.
168
    mutable: Arc<TunnelMutableState>,
169
    /// A unique identifier for this circuit.
170
    unique_id: UniqId,
171
    /// Channel to send control messages to the reactor.
172
    pub(super) control: mpsc::UnboundedSender<CtrlMsg>,
173
    /// Channel to send commands to the reactor.
174
    pub(super) command: mpsc::UnboundedSender<CtrlCmd>,
175
    /// A future that resolves to Cancelled once the reactor is shut down,
176
    /// meaning that the circuit is closed.
177
    #[cfg_attr(not(feature = "experimental-api"), allow(dead_code))]
178
    reactor_closed_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
179
    /// For testing purposes: the CircId, for use in peek_circid().
180
    #[cfg(test)]
181
    circid: CircId,
182
    /// Memory quota account
183
    memquota: CircuitAccount,
184
    /// Time provider
185
    time_provider: DynTimeProvider,
186
}
187

            
188
/// The mutable state of a tunnel, shared between [`ClientCirc`] and [`Reactor`].
189
///
190
/// NOTE(gabi): this mutex-inside-a-mutex might look suspicious,
191
/// but it is currently the best option we have for sharing
192
/// the circuit state with `ClientCirc` (and soon, with `ClientTunnel`).
193
/// In practice, these mutexes won't be accessed very often
194
/// (they're accessed for writing when a circuit is extended,
195
/// and for reading by the various `ClientCirc` APIs),
196
/// so they shouldn't really impact performance.
197
///
198
/// Alternatively, the circuit state information could be shared
199
/// outside the reactor through a channel (passed to the reactor via a `CtrlCmd`),
200
/// but in #1840 @opara notes that involves making the `ClientCirc` accessors
201
/// (`ClientCirc::path`, `ClientCirc::binding_key`, etc.)
202
/// asynchronous, which will significantly complicate their callsites,
203
/// which would in turn need to be made async too.
204
///
205
/// We should revisit this decision at some point, and decide whether an async API
206
/// would be preferable.
207
#[derive(Debug, Default)]
208
pub(super) struct TunnelMutableState(Mutex<HashMap<UniqId, Arc<MutableState>>>);
209

            
210
impl TunnelMutableState {
211
    /// Add the [`MutableState`] of a circuit.
212
288
    pub(super) fn insert(&self, unique_id: UniqId, mutable: Arc<MutableState>) {
213
288
        #[allow(unused)] // unused in non-debug builds
214
288
        let state = self
215
288
            .0
216
288
            .lock()
217
288
            .expect("lock poisoned")
218
288
            .insert(unique_id, mutable);
219
288

            
220
288
        debug_assert!(state.is_none());
221
288
    }
222

            
223
    /// Remove the [`MutableState`] of a circuit.
224
24
    pub(super) fn remove(&self, unique_id: UniqId) {
225
24
        #[allow(unused)] // unused in non-debug builds
226
24
        let state = self.0.lock().expect("lock poisoned").remove(&unique_id);
227
24

            
228
24
        debug_assert!(state.is_some());
229
24
    }
230

            
231
    /// Return a [`Path`] object describing all the hops in the specified circuit.
232
    ///
233
    /// See [`MutableState::path`].
234
32
    fn path_ref(&self, unique_id: UniqId) -> Result<Arc<Path>> {
235
32
        let lock = self.0.lock().expect("lock poisoned");
236
32
        let mutable = lock
237
32
            .get(&unique_id)
238
32
            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
239

            
240
32
        Ok(mutable.path())
241
32
    }
242

            
243
    /// Return a description of the first hop of this circuit.
244
    ///
245
    /// Returns an error if a circuit with the specified [`UniqId`] doesn't exist.
246
    /// Returns `Ok(None)` if the specified circuit doesn't have any hops.
247
    fn first_hop(&self, unique_id: UniqId) -> Result<Option<OwnedChanTarget>> {
248
        let lock = self.0.lock().expect("lock poisoned");
249
        let mutable = lock
250
            .get(&unique_id)
251
            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
252

            
253
        let first_hop = mutable.first_hop().map(|first_hop| match first_hop {
254
            path::HopDetail::Relay(r) => r,
255
            #[cfg(feature = "hs-common")]
256
            path::HopDetail::Virtual => {
257
                panic!("somehow made a circuit with a virtual first hop.")
258
            }
259
        });
260

            
261
        Ok(first_hop)
262
    }
263

            
264
    /// Return the [`HopNum`] of the last hop of the specified circuit.
265
    ///
266
    /// Returns an error if a circuit with the specified [`UniqId`] doesn't exist.
267
    ///
268
    /// See [`MutableState::last_hop_num`].
269
32
    fn last_hop_num(&self, unique_id: UniqId) -> Result<Option<HopNum>> {
270
32
        let lock = self.0.lock().expect("lock poisoned");
271
32
        let mutable = lock
272
32
            .get(&unique_id)
273
32
            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
274

            
275
32
        Ok(mutable.last_hop_num())
276
32
    }
277

            
278
    /// Return the number of hops in the specified circuit.
279
    ///
280
    /// See [`MutableState::n_hops`].
281
80
    fn n_hops(&self, unique_id: UniqId) -> Result<usize> {
282
80
        let lock = self.0.lock().expect("lock poisoned");
283
80
        let mutable = lock
284
80
            .get(&unique_id)
285
80
            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
286

            
287
80
        Ok(mutable.n_hops())
288
80
    }
289

            
290
    /// Return the cryptographic material used to prove knowledge of a shared
291
    /// secret with with `hop` on the circuit with the specified `unique_id`.
292
    fn binding_key(&self, unique_id: UniqId, hop: HopNum) -> Result<Option<CircuitBinding>> {
293
        let lock = self.0.lock().expect("lock poisoned");
294
        let mutable = lock
295
            .get(&unique_id)
296
            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
297

            
298
        Ok(mutable.binding_key(hop))
299
    }
300

            
301
    /// Return the number of legs in this tunnel.
302
    ///
303
    /// TODO(conflux-fork): this can be removed once we modify `path_ref`
304
    /// to return *all* the Paths in the tunnel.
305
44
    fn n_legs(&self) -> usize {
306
44
        let lock = self.0.lock().expect("lock poisoned");
307
44
        lock.len()
308
44
    }
309
}
310

            
311
/// The mutable state of a circuit.
312
32
#[derive(Educe, Default)]
313
#[educe(Debug)]
314
pub(super) struct MutableState(Mutex<CircuitState>);
315

            
316
impl MutableState {
317
    /// Add a hop to the path of this circuit.
318
696
    pub(super) fn add_hop(&self, peer_id: HopDetail, binding: Option<CircuitBinding>) {
319
696
        let mut mutable = self.0.lock().expect("poisoned lock");
320
696
        Arc::make_mut(&mut mutable.path).push_hop(peer_id);
321
696
        mutable.binding.push(binding);
322
696
    }
323

            
324
    /// Get a copy of the circuit's current [`path::Path`].
325
112
    pub(super) fn path(&self) -> Arc<path::Path> {
326
112
        let mutable = self.0.lock().expect("poisoned lock");
327
112
        Arc::clone(&mutable.path)
328
112
    }
329

            
330
    /// Return the cryptographic material used to prove knowledge of a shared
331
    /// secret with with `hop`.
332
    pub(super) fn binding_key(&self, hop: HopNum) -> Option<CircuitBinding> {
333
        let mutable = self.0.lock().expect("poisoned lock");
334

            
335
        mutable.binding.get::<usize>(hop.into()).cloned().flatten()
336
        // NOTE: I'm not thrilled to have to copy this information, but we use
337
        // it very rarely, so it's not _that_ bad IMO.
338
    }
339

            
340
    /// Return a description of the first hop of this circuit.
341
    fn first_hop(&self) -> Option<HopDetail> {
342
        let mutable = self.0.lock().expect("poisoned lock");
343
        mutable.path.first_hop()
344
    }
345

            
346
    /// Return the [`HopNum`] of the last hop of this circuit.
347
    ///
348
    /// NOTE: This function will return the [`HopNum`] of the hop
349
    /// that is _currently_ the last. If there is an extend operation in progress,
350
    /// the currently pending hop may or may not be counted, depending on whether
351
    /// the extend operation finishes before this call is done.
352
32
    fn last_hop_num(&self) -> Option<HopNum> {
353
32
        let mutable = self.0.lock().expect("poisoned lock");
354
32
        mutable.path.last_hop_num()
355
32
    }
356

            
357
    /// Return the number of hops in this circuit.
358
    ///
359
    /// NOTE: This function will currently return only the number of hops
360
    /// _currently_ in the circuit. If there is an extend operation in progress,
361
    /// the currently pending hop may or may not be counted, depending on whether
362
    /// the extend operation finishes before this call is done.
363
80
    fn n_hops(&self) -> usize {
364
80
        let mutable = self.0.lock().expect("poisoned lock");
365
80
        mutable.path.n_hops()
366
80
    }
367
}
368

            
369
/// The shared state of a circuit.
370
32
#[derive(Educe, Default)]
371
#[educe(Debug)]
372
pub(super) struct CircuitState {
373
    /// Information about this circuit's path.
374
    ///
375
    /// This is stored in an Arc so that we can cheaply give a copy of it to
376
    /// client code; when we need to add a hop (which is less frequent) we use
377
    /// [`Arc::make_mut()`].
378
    path: Arc<path::Path>,
379

            
380
    /// Circuit binding keys [q.v.][`CircuitBinding`] information for each hop
381
    /// in the circuit's path.
382
    ///
383
    /// NOTE: Right now, there is a `CircuitBinding` for every hop.  There's a
384
    /// fair chance that this will change in the future, and I don't want other
385
    /// code to assume that a `CircuitBinding` _must_ exist, so I'm making this
386
    /// an `Option`.
387
    #[educe(Debug(ignore))]
388
    binding: Vec<Option<CircuitBinding>>,
389
}
390

            
391
/// A ClientCirc that needs to send a create cell and receive a created* cell.
392
///
393
/// To use one of these, call `create_firsthop_fast()` or `create_firsthop()`
394
/// to negotiate the cryptographic handshake with the first hop.
395
pub struct PendingClientCirc {
396
    /// A oneshot receiver on which we'll receive a CREATED* cell,
397
    /// or a DESTROY cell.
398
    recvcreated: oneshot::Receiver<CreateResponse>,
399
    /// The ClientCirc object that we can expose on success.
400
    circ: Arc<ClientCirc>,
401
}
402

            
403
/// Description of the network's current rules for building circuits.
404
///
405
/// This type describes rules derived from the consensus,
406
/// and possibly amended by our own configuration.
407
///
408
/// Typically, this type created once for an entire circuit,
409
/// and any special per-hop information is derived
410
/// from each hop as a CircTarget.
411
/// Note however that callers _may_ provide different `CircParameters`
412
/// for different hops within a circuit if they have some reason to do so,
413
/// so we do not enforce that every hop in a circuit has the same `CircParameters`.
414
#[non_exhaustive]
415
#[derive(Clone, Debug)]
416
pub struct CircParameters {
417
    /// Whether we should include ed25519 identities when we send
418
    /// EXTEND2 cells.
419
    pub extend_by_ed25519_id: bool,
420
    /// Congestion control parameters for this circuit.
421
    pub ccontrol: CongestionControlParams,
422
}
423

            
424
/// The settings we use for single hop of a circuit.
425
///
426
/// Unlike [`CircParameters`], this type is crate-internal.
427
/// We construct it based on our settings from the circuit,
428
/// and from the hop's actual capabilities.
429
/// Then, we negotiate with the hop as part of circuit
430
/// creation/extension to determine the actual settings that will be in use.
431
/// Finally, we use those settings to construct the negotiated circuit hop.
432
//
433
// TODO: Relays should probably derive an instance of this type too, as
434
// part of the circuit creation handshake.
435
#[derive(Clone, Debug)]
436
pub(super) struct HopSettings {
437
    /// The negotiated congestion control settings for this circuit.
438
    pub(super) ccontrol: CongestionControlParams,
439
}
440

            
441
impl HopSettings {
442
    /// Construct a new `HopSettings` based on `params` (a set of circuit parameters)
443
    /// and `caps` (a set of protocol capabilities for a circuit target).
444
    ///
445
    /// The resulting settings will represent what the client would prefer to negotiate
446
    /// (determined by `params`),
447
    /// as modified by what the target relay is believed to support (represented by `caps`).
448
    ///
449
    /// This represents the `HopSettings` in a pre-negotiation state:
450
    /// the circuit negotiation process will modify it.
451
    #[allow(clippy::unnecessary_wraps)] // likely to become fallible in the future.
452
728
    pub(super) fn from_params_and_caps(
453
728
        params: &CircParameters,
454
728
        caps: &tor_protover::Protocols,
455
728
    ) -> Result<Self> {
456
728
        let mut settings = Self {
457
728
            ccontrol: params.ccontrol.clone(),
458
728
        };
459
728

            
460
728
        match settings.ccontrol.alg() {
461
504
            crate::ccparams::Algorithm::FixedWindow(_) => {}
462
            crate::ccparams::Algorithm::Vegas(_) => {
463
                // If the target doesn't support FLOWCTRL_CC, we can't use Vegas.
464
224
                if !caps.supports_named_subver(named::FLOWCTRL_CC) {
465
                    settings.ccontrol.use_fallback_alg();
466
224
                }
467
            }
468
        }
469

            
470
728
        Ok(settings)
471
728
    }
472

            
473
    /// Return a new `HopSettings` based on this one,
474
    /// representing the settings that we should use
475
    /// if circuit negotiation will be impossible.
476
    ///
477
    /// (Circuit negotiation is impossible when using the legacy ntor protocol,
478
    /// and when using CRATE_FAST.  It is currently unsupported with virtual hops.)
479
56
    pub(super) fn without_negotiation(mut self) -> Self {
480
56
        self.ccontrol.use_fallback_alg();
481
56
        self
482
56
    }
483
}
484

            
485
#[cfg(test)]
486
impl std::default::Default for CircParameters {
487
222
    fn default() -> Self {
488
222
        Self {
489
222
            extend_by_ed25519_id: true,
490
222
            ccontrol: crate::congestion::test_utils::params::build_cc_fixed_params(),
491
222
        }
492
222
    }
493
}
494

            
495
impl CircParameters {
496
    /// Constructor
497
561
    pub fn new(extend_by_ed25519_id: bool, ccontrol: CongestionControlParams) -> Self {
498
561
        Self {
499
561
            extend_by_ed25519_id,
500
561
            ccontrol,
501
561
        }
502
561
    }
503
}
504

            
505
impl ClientCirc {
506
    /// Return a description of the first hop of this circuit.
507
    ///
508
    /// # Panics
509
    ///
510
    /// Panics if there is no first hop.  (This should be impossible outside of
511
    /// the tor-proto crate, but within the crate it's possible to have a
512
    /// circuit with no hops.)
513
    pub fn first_hop(&self) -> Result<OwnedChanTarget> {
514
        Ok(self
515
            .mutable
516
            .first_hop(self.unique_id)
517
            .map_err(|_| Error::CircuitClosed)?
518
            .expect("called first_hop on an un-constructed circuit"))
519
    }
520

            
521
    /// Return the [`HopNum`] of the last hop of this circuit.
522
    ///
523
    /// Returns an error if there is no last hop.  (This should be impossible outside of the
524
    /// tor-proto crate, but within the crate it's possible to have a circuit with no hops.)
525
    ///
526
    /// NOTE: This function will return the [`HopNum`] of the hop
527
    /// that is _currently_ the last. If there is an extend operation in progress,
528
    /// the currently pending hop may or may not be counted, depending on whether
529
    /// the extend operation finishes before this call is done.
530
32
    pub fn last_hop_num(&self) -> Result<HopNum> {
531
32
        Ok(self
532
32
            .mutable
533
32
            .last_hop_num(self.unique_id)?
534
32
            .ok_or_else(|| internal!("no last hop index"))?)
535
32
    }
536

            
537
    /// Return a [`Path`] object describing all the hops in this circuit.
538
    ///
539
    /// Note that this `Path` is not automatically updated if the circuit is
540
    /// extended.
541
32
    pub fn path_ref(&self) -> Result<Arc<Path>> {
542
32
        self.mutable
543
32
            .path_ref(self.unique_id)
544
32
            .map_err(|_| Error::CircuitClosed)
545
32
    }
546

            
547
    /// Get the clock skew claimed by the first hop of the circuit.
548
    ///
549
    /// See [`Channel::clock_skew()`].
550
    pub async fn first_hop_clock_skew(&self) -> Result<ClockSkew> {
551
        let (tx, rx) = oneshot::channel();
552

            
553
        self.control
554
            .unbounded_send(CtrlMsg::FirstHopClockSkew { answer: tx })
555
            .map_err(|_| Error::CircuitClosed)?;
556

            
557
        Ok(rx.await.map_err(|_| Error::CircuitClosed)??)
558
    }
559

            
560
    /// Return a reference to this circuit's memory quota account
561
60
    pub fn mq_account(&self) -> &CircuitAccount {
562
60
        &self.memquota
563
60
    }
564

            
565
    /// Return the cryptographic material used to prove knowledge of a shared
566
    /// secret with with `hop`.
567
    ///
568
    /// See [`CircuitBinding`] for more information on how this is used.
569
    ///
570
    /// Return None if we have no circuit binding information for the hop, or if
571
    /// the hop does not exist.
572
    pub fn binding_key(&self, hop: HopNum) -> Result<Option<CircuitBinding>> {
573
        self.mutable
574
            .binding_key(self.unique_id, hop)
575
            .map_err(|_| Error::CircuitClosed)
576
    }
577

            
578
    /// Start an ad-hoc protocol exchange to the specified hop on this circuit
579
    ///
580
    /// To use this:
581
    ///
582
    ///  0. Create an inter-task channel you'll use to receive
583
    ///     the outcome of your conversation,
584
    ///     and bundle it into a [`MsgHandler`].
585
    ///
586
    ///  1. Call `start_conversation`.
587
    ///     This will install a your handler, for incoming messages,
588
    ///     and send the outgoing message (if you provided one).
589
    ///     After that, each message on the circuit
590
    ///     that isn't handled by the core machinery
591
    ///     is passed to your provided `reply_handler`.
592
    ///
593
    ///  2. Possibly call `send_msg` on the [`Conversation`],
594
    ///     from the call site of `start_conversation`,
595
    ///     possibly multiple times, from time to time,
596
    ///     to send further desired messages to the peer.
597
    ///
598
    ///  3. In your [`MsgHandler`], process the incoming messages.
599
    ///     You may respond by
600
    ///     sending additional messages
601
    ///     When the protocol exchange is finished,
602
    ///     `MsgHandler::handle_msg` should return
603
    ///     [`ConversationFinished`](MetaCellDisposition::ConversationFinished).
604
    ///
605
    /// If you don't need the `Conversation` to send followup messages,
606
    /// you may simply drop it,
607
    /// and rely on the responses you get from your handler,
608
    /// on the channel from step 0 above.
609
    /// Your handler will remain installed and able to process incoming messages
610
    /// until it returns `ConversationFinished`.
611
    ///
612
    /// (If you don't want to accept any replies at all, it may be
613
    /// simpler to use [`ClientCirc::send_raw_msg`].)
614
    ///
615
    /// Note that it is quite possible to use this function to violate the tor
616
    /// protocol; most users of this API will not need to call it.  It is used
617
    /// to implement most of the onion service handshake.
618
    ///
619
    /// # Limitations
620
    ///
621
    /// Only one conversation may be active at any one time,
622
    /// for any one circuit.
623
    /// This generally means that this function should not be called
624
    /// on a circuit which might be shared with anyone else.
625
    ///
626
    /// Likewise, it is forbidden to try to extend the circuit,
627
    /// while the conversation is in progress.
628
    ///
629
    /// After the conversation has finished, the circuit may be extended.
630
    /// Or, `start_conversation` may be called again;
631
    /// but, in that case there will be a gap between the two conversations,
632
    /// during which no `MsgHandler` is installed,
633
    /// and unexpected incoming messages would close the circuit.
634
    ///
635
    /// If these restrictions are violated, the circuit will be closed with an error.
636
    ///
637
    /// ## Precise definition of the lifetime of a conversation
638
    ///
639
    /// A conversation is in progress from entry to `start_conversation`,
640
    /// until entry to the body of the [`MsgHandler::handle_msg`]
641
    /// call which returns [`ConversationFinished`](MetaCellDisposition::ConversationFinished).
642
    /// (*Entry* since `handle_msg` is synchronously embedded
643
    /// into the incoming message processing.)
644
    /// So you may start a new conversation as soon as you have the final response
645
    /// via your inter-task channel from (0) above.
646
    ///
647
    /// The lifetime relationship of the [`Conversation`],
648
    /// vs the handler returning `ConversationFinished`
649
    /// is not enforced by the type system.
650
    // Doing so without still leaving plenty of scope for runtime errors doesn't seem possible,
651
    // at least while allowing sending followup messages from outside the handler.
652
    //
653
    // TODO hs: it might be nice to avoid exposing tor-cell APIs in the
654
    //   tor-proto interface.
655
    #[cfg(feature = "send-control-msg")]
656
    pub async fn start_conversation(
657
        &self,
658
        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
659
        reply_handler: impl MsgHandler + Send + 'static,
660
        hop_num: HopNum,
661
    ) -> Result<Conversation<'_>> {
662
        let handler = Box::new(UserMsgHandler::new(hop_num, reply_handler));
663
        let conversation = Conversation(self);
664
        conversation.send_internal(msg, Some(handler)).await?;
665
        Ok(conversation)
666
    }
667

            
668
    /// Send an ad-hoc message to a given hop on the circuit, without expecting
669
    /// a reply.
670
    ///
671
    /// (If you want to handle one or more possible replies, see
672
    /// [`ClientCirc::start_conversation`].)
673
    #[cfg(feature = "send-control-msg")]
674
    pub async fn send_raw_msg(
675
        &self,
676
        msg: tor_cell::relaycell::msg::AnyRelayMsg,
677
        hop_num: HopNum,
678
    ) -> Result<()> {
679
        let (sender, receiver) = oneshot::channel();
680
        let ctrl_msg = CtrlMsg::SendMsg {
681
            hop_num,
682
            msg,
683
            sender,
684
        };
685
        self.control
686
            .unbounded_send(ctrl_msg)
687
            .map_err(|_| Error::CircuitClosed)?;
688

            
689
        receiver.await.map_err(|_| Error::CircuitClosed)?
690
    }
691

            
692
    /// Tell this circuit to begin allowing the final hop of the circuit to try
693
    /// to create new Tor streams, and to return those pending requests in an
694
    /// asynchronous stream.
695
    ///
696
    /// Ordinarily, these requests are rejected.
697
    ///
698
    /// There can only be one [`Stream`](futures::Stream) of this type created on a given circuit.
699
    /// If a such a [`Stream`](futures::Stream) already exists, this method will return
700
    /// an error.
701
    ///
702
    /// After this method has been called on a circuit, the circuit is expected
703
    /// to receive requests of this type indefinitely, until it is finally closed.
704
    /// If the `Stream` is dropped, the next request on this circuit will cause it to close.
705
    ///
706
    /// Only onion services (and eventually) exit relays should call this
707
    /// method.
708
    //
709
    // TODO: Someday, we might want to allow a stream request handler to be
710
    // un-registered.  However, nothing in the Tor protocol requires it.
711
    #[cfg(feature = "hs-service")]
712
44
    pub async fn allow_stream_requests(
713
44
        self: &Arc<ClientCirc>,
714
44
        allow_commands: &[tor_cell::relaycell::RelayCmd],
715
44
        hop_num: HopNum,
716
44
        filter: impl crate::stream::IncomingStreamRequestFilter,
717
44
    ) -> Result<impl futures::Stream<Item = IncomingStream>> {
718
        use futures::stream::StreamExt;
719

            
720
        use crate::tunnel::HopLocation;
721

            
722
        /// The size of the channel receiving IncomingStreamRequestContexts.
723
        const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
724

            
725
        // TODO(#2002): support onion service conflux
726
44
        let circ_count = self.mutable.n_legs();
727
44
        if circ_count != 1 {
728
4
            return Err(
729
4
                internal!("Cannot allow stream requests on tunnel with {circ_count} legs",).into(),
730
4
            );
731
40
        }
732
40

            
733
40
        let time_prov = self.time_provider.clone();
734
40
        let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
735
40
        let (incoming_sender, incoming_receiver) =
736
40
            MpscSpec::new(INCOMING_BUFFER).new_mq(time_prov, self.memquota.as_raw_account())?;
737
40
        let (tx, rx) = oneshot::channel();
738
40

            
739
40
        self.command
740
40
            .unbounded_send(CtrlCmd::AwaitStreamRequest {
741
40
                cmd_checker,
742
40
                incoming_sender,
743
40
                hop_num,
744
40
                done: tx,
745
40
                filter: Box::new(filter),
746
40
            })
747
40
            .map_err(|_| Error::CircuitClosed)?;
748

            
749
        // Check whether the AwaitStreamRequest was processed successfully.
750
40
        rx.await.map_err(|_| Error::CircuitClosed)??;
751

            
752
32
        let allowed_hop_num = hop_num;
753
32

            
754
32
        let circ = Arc::clone(self);
755
32
        Ok(incoming_receiver.map(move |req_ctx| {
756
24
            let StreamReqInfo {
757
24
                req,
758
24
                stream_id,
759
24
                hop_num,
760
24
                leg,
761
24
                receiver,
762
24
                msg_tx,
763
24
                memquota,
764
24
                relay_cell_format,
765
24
            } = req_ctx;
766
24

            
767
24
            // We already enforce this in handle_incoming_stream_request; this
768
24
            // assertion is just here to make sure that we don't ever
769
24
            // accidentally remove or fail to enforce that check, since it is
770
24
            // security-critical.
771
24
            assert_eq!(allowed_hop_num, hop_num);
772

            
773
            // TODO(#2002): figure out what this is going to look like
774
            // for onion services (perhaps we should forbid this function
775
            // from being called on a multipath circuit?)
776
            //
777
            // See also:
778
            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3002#note_3200937
779
24
            let target = StreamTarget {
780
24
                circ: Arc::clone(&circ),
781
24
                tx: msg_tx,
782
24
                hop: HopLocation::Hop((leg, hop_num)),
783
24
                stream_id,
784
24
                relay_cell_format,
785
24
            };
786
24

            
787
24
            let reader = StreamReader {
788
24
                target: target.clone(),
789
24
                receiver,
790
24
                recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
791
24
                ended: false,
792
24
            };
793
24

            
794
24
            IncomingStream::new(circ.time_provider.clone(), req, target, reader, memquota)
795
32
        }))
796
44
    }
797

            
798
    /// Extend the circuit, via the most appropriate circuit extension handshake,
799
    /// to the chosen `target` hop.
800
    pub async fn extend<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
801
    where
802
        Tg: CircTarget,
803
    {
804
        // For now we use the simplest decision-making mechanism:
805
        // we use ntor_v3 whenever it is present; and otherwise we use ntor.
806
        //
807
        // This behavior is slightly different from C tor, which uses ntor v3
808
        // only whenever it want to send any extension in the circuit message.
809
        // But thanks to congestion control (named::FLOWCTRL_CC), we'll _always_
810
        // want to use an extension if we can, and so it doesn't make too much
811
        // sense to detect the case where we have no extensions.
812
        //
813
        // (As of April 2025, RELAY_NTORV3 is not yet listed as Required for relays
814
        // on the tor network, and so we cannot simply assume that everybody has it.)
815
        if target
816
            .protovers()
817
            .supports_named_subver(named::RELAY_NTORV3)
818
        {
819
            self.extend_ntor_v3(target, params).await
820
        } else {
821
            self.extend_ntor(target, params).await
822
        }
823
    }
824

            
825
    /// Extend the circuit via the ntor handshake to a new target last
826
    /// hop.
827
40
    pub async fn extend_ntor<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
828
40
    where
829
40
        Tg: CircTarget,
830
40
    {
831
40
        let key = NtorPublicKey {
832
40
            id: *target
833
40
                .rsa_identity()
834
40
                .ok_or(Error::MissingId(RelayIdType::Rsa))?,
835
40
            pk: *target.ntor_onion_key(),
836
        };
837
40
        let mut linkspecs = target
838
40
            .linkspecs()
839
40
            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
840
40
        if !params.extend_by_ed25519_id {
841
            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
842
40
        }
843

            
844
40
        let (tx, rx) = oneshot::channel();
845
40

            
846
40
        let peer_id = OwnedChanTarget::from_chan_target(target);
847
40
        let settings =
848
40
            HopSettings::from_params_and_caps(&params, target.protovers())?.without_negotiation();
849
40
        self.control
850
40
            .unbounded_send(CtrlMsg::ExtendNtor {
851
40
                peer_id,
852
40
                public_key: key,
853
40
                linkspecs,
854
40
                settings,
855
40
                done: tx,
856
40
            })
857
40
            .map_err(|_| Error::CircuitClosed)?;
858

            
859
40
        rx.await.map_err(|_| Error::CircuitClosed)??;
860

            
861
8
        Ok(())
862
40
    }
863

            
864
    /// Extend the circuit via the ntor handshake to a new target last
865
    /// hop.
866
8
    pub async fn extend_ntor_v3<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
867
8
    where
868
8
        Tg: CircTarget,
869
8
    {
870
8
        let key = NtorV3PublicKey {
871
8
            id: *target
872
8
                .ed_identity()
873
8
                .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
874
8
            pk: *target.ntor_onion_key(),
875
        };
876
8
        let mut linkspecs = target
877
8
            .linkspecs()
878
8
            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
879
8
        if !params.extend_by_ed25519_id {
880
            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
881
8
        }
882

            
883
8
        let (tx, rx) = oneshot::channel();
884
8

            
885
8
        let peer_id = OwnedChanTarget::from_chan_target(target);
886
8
        let settings = HopSettings::from_params_and_caps(&params, target.protovers())?;
887
8
        self.control
888
8
            .unbounded_send(CtrlMsg::ExtendNtorV3 {
889
8
                peer_id,
890
8
                public_key: key,
891
8
                linkspecs,
892
8
                settings,
893
8
                done: tx,
894
8
            })
895
8
            .map_err(|_| Error::CircuitClosed)?;
896

            
897
8
        rx.await.map_err(|_| Error::CircuitClosed)??;
898

            
899
8
        Ok(())
900
8
    }
901

            
902
    /// Extend this circuit by a single, "virtual" hop.
903
    ///
904
    /// A virtual hop is one for which we do not add an actual network connection
905
    /// between separate hosts (such as Relays).  We only add a layer of
906
    /// cryptography.
907
    ///
908
    /// This is used to implement onion services: the client and the service
909
    /// both build a circuit to a single rendezvous point, and tell the
910
    /// rendezvous point to relay traffic between their two circuits.  Having
911
    /// completed a [`handshake`] out of band[^1], the parties each extend their
912
    /// circuits by a single "virtual" encryption hop that represents their
913
    /// shared cryptographic context.
914
    ///
915
    /// Once a circuit has been extended in this way, it is an error to try to
916
    /// extend it in any other way.
917
    ///
918
    /// [^1]: Technically, the handshake is only _mostly_ out of band: the
919
    ///     client sends their half of the handshake in an ` message, and the
920
    ///     service's response is inline in its `RENDEZVOUS2` message.
921
    //
922
    // TODO hs: let's try to enforce the "you can't extend a circuit again once
923
    // it has been extended this way" property.  We could do that with internal
924
    // state, or some kind of a type state pattern.
925
    #[cfg(feature = "hs-common")]
926
    pub async fn extend_virtual(
927
        &self,
928
        protocol: handshake::RelayProtocol,
929
        role: handshake::HandshakeRole,
930
        seed: impl handshake::KeyGenerator,
931
        params: &CircParameters,
932
        capabilities: &tor_protover::Protocols,
933
    ) -> Result<()> {
934
        use self::handshake::BoxedClientLayer;
935

            
936
        let protocol = handshake::RelayCryptLayerProtocol::from(protocol);
937
        let relay_cell_format = protocol.relay_cell_format();
938

            
939
        let BoxedClientLayer { fwd, back, binding } =
940
            protocol.construct_client_layers(role, seed)?;
941

            
942
        let settings = HopSettings::from_params_and_caps(params, capabilities)?
943
            // TODO #2037: We _should_ support negotiation here; see #2037.
944
            .without_negotiation();
945
        let (tx, rx) = oneshot::channel();
946
        let message = CtrlCmd::ExtendVirtual {
947
            relay_cell_format,
948
            cell_crypto: (fwd, back, binding),
949
            settings,
950
            done: tx,
951
        };
952

            
953
        self.command
954
            .unbounded_send(message)
955
            .map_err(|_| Error::CircuitClosed)?;
956

            
957
        rx.await.map_err(|_| Error::CircuitClosed)?
958
    }
959

            
960
    /// Helper, used to begin a stream.
961
    ///
962
    /// This function allocates a stream ID, and sends the message
963
    /// (like a BEGIN or RESOLVE), but doesn't wait for a response.
964
    ///
965
    /// The caller will typically want to see the first cell in response,
966
    /// to see whether it is e.g. an END or a CONNECTED.
967
60
    async fn begin_stream_impl(
968
60
        self: &Arc<ClientCirc>,
969
60
        begin_msg: AnyRelayMsg,
970
60
        cmd_checker: AnyCmdChecker,
971
90
    ) -> Result<(StreamReader, StreamTarget, StreamAccount)> {
972
60
        // TODO: Possibly this should take a hop, rather than just
973
60
        // assuming it's the last hop.
974
60
        let hop = TargetHop::LastHop;
975
60

            
976
60
        let time_prov = self.time_provider.clone();
977

            
978
60
        let memquota = StreamAccount::new(self.mq_account())?;
979
60
        let (sender, receiver) = MpscSpec::new(STREAM_READER_BUFFER)
980
60
            .new_mq(time_prov.clone(), memquota.as_raw_account())?;
981
60
        let (tx, rx) = oneshot::channel();
982
60
        let (msg_tx, msg_rx) =
983
60
            MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(time_prov, memquota.as_raw_account())?;
984

            
985
60
        self.control
986
60
            .unbounded_send(CtrlMsg::BeginStream {
987
60
                hop,
988
60
                message: begin_msg,
989
60
                sender,
990
60
                rx: msg_rx,
991
60
                done: tx,
992
60
                cmd_checker,
993
60
            })
994
60
            .map_err(|_| Error::CircuitClosed)?;
995

            
996
60
        let (stream_id, hop, relay_cell_format) = rx.await.map_err(|_| Error::CircuitClosed)??;
997

            
998
60
        let target = StreamTarget {
999
60
            circ: self.clone(),
60
            tx: msg_tx,
60
            hop,
60
            stream_id,
60
            relay_cell_format,
60
        };
60

            
60
        let reader = StreamReader {
60
            target: target.clone(),
60
            receiver,
60
            recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
60
            ended: false,
60
        };
60

            
60
        Ok((reader, target, memquota))
60
    }
    /// Start a DataStream (anonymized connection) to the given
    /// address and port, using a BEGIN cell.
60
    async fn begin_data_stream(
60
        self: &Arc<ClientCirc>,
60
        msg: AnyRelayMsg,
60
        optimistic: bool,
90
    ) -> Result<DataStream> {
60
        let (reader, target, memquota) = self
60
            .begin_stream_impl(msg, DataCmdChecker::new_any())
60
            .await?;
60
        let mut stream = DataStream::new(self.time_provider.clone(), reader, target, memquota);
60
        if !optimistic {
52
            stream.wait_for_connection().await?;
8
        }
60
        Ok(stream)
60
    }
    /// Start a stream to the given address and port, using a BEGIN
    /// cell.
    ///
    /// The use of a string for the address is intentional: you should let
    /// the remote Tor relay do the hostname lookup for you.
52
    pub async fn begin_stream(
52
        self: &Arc<ClientCirc>,
52
        target: &str,
52
        port: u16,
52
        parameters: Option<StreamParameters>,
78
    ) -> Result<DataStream> {
52
        let parameters = parameters.unwrap_or_default();
52
        let begin_flags = parameters.begin_flags();
52
        let optimistic = parameters.is_optimistic();
52
        let target = if parameters.suppressing_hostname() {
            ""
        } else {
52
            target
        };
52
        let beginmsg = Begin::new(target, port, begin_flags)
52
            .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
52
        self.begin_data_stream(beginmsg.into(), optimistic).await
52
    }
    /// Start a new stream to the last relay in the circuit, using
    /// a BEGIN_DIR cell.
12
    pub async fn begin_dir_stream(self: Arc<ClientCirc>) -> Result<DataStream> {
8
        // Note that we always open begindir connections optimistically.
8
        // Since they are local to a relay that we've already authenticated
8
        // with and built a circuit to, there should be no additional checks
8
        // we need to perform to see whether the BEGINDIR will succeed.
8
        self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
8
            .await
8
    }
    /// Perform a DNS lookup, using a RESOLVE cell with the last relay
    /// in this circuit.
    ///
    /// Note that this function does not check for timeouts; that's
    /// the caller's responsibility.
    pub async fn resolve(self: &Arc<ClientCirc>, hostname: &str) -> Result<Vec<IpAddr>> {
        let resolve_msg = Resolve::new(hostname);
        let resolved_msg = self.try_resolve(resolve_msg).await?;
        resolved_msg
            .into_answers()
            .into_iter()
            .filter_map(|(val, _)| match resolvedval_to_result(val) {
                Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
                Ok(_) => None,
                Err(e) => Some(Err(e)),
            })
            .collect()
    }
    /// Perform a reverse DNS lookup, by sending a RESOLVE cell with
    /// the last relay on this circuit.
    ///
    /// Note that this function does not check for timeouts; that's
    /// the caller's responsibility.
    pub async fn resolve_ptr(self: &Arc<ClientCirc>, addr: IpAddr) -> Result<Vec<String>> {
        let resolve_ptr_msg = Resolve::new_reverse(&addr);
        let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
        resolved_msg
            .into_answers()
            .into_iter()
            .filter_map(|(val, _)| match resolvedval_to_result(val) {
                Ok(ResolvedVal::Hostname(v)) => Some(
                    String::from_utf8(v)
                        .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
                ),
                Ok(_) => None,
                Err(e) => Some(Err(e)),
            })
            .collect()
    }
    /// Helper: Send the resolve message, and read resolved message from
    /// resolve stream.
    async fn try_resolve(self: &Arc<ClientCirc>, msg: Resolve) -> Result<Resolved> {
        let (reader, _target, memquota) = self
            .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
            .await?;
        let mut resolve_stream = ResolveStream::new(reader, memquota);
        resolve_stream.read_msg().await
    }
    /// Shut down this circuit, along with all streams that are using it.
    /// Happens asynchronously (i.e. the circuit won't necessarily be done shutting down
    /// immediately after this function returns!).
    ///
    /// Note that other references to this circuit may exist.  If they
    /// do, they will stop working after you call this function.
    ///
    /// It's not necessary to call this method if you're just done
    /// with a circuit: the circuit should close on its own once nothing
    /// is using it any more.
    pub fn terminate(&self) {
        let _ = self.command.unbounded_send(CtrlCmd::Shutdown);
    }
    /// Called when a circuit-level protocol error has occurred and the
    /// circuit needs to shut down.
    ///
    /// This is a separate function because we may eventually want to have
    /// it do more than just shut down.
    ///
    /// As with `terminate`, this function is asynchronous.
    pub(crate) fn protocol_error(&self) {
        self.terminate();
    }
    /// Return true if this circuit is closed and therefore unusable.
80
    pub fn is_closing(&self) -> bool {
80
        self.control.is_closed()
80
    }
    /// Return a process-unique identifier for this circuit.
    pub fn unique_id(&self) -> UniqId {
        self.unique_id
    }
    /// Return the number of hops in this circuit.
    ///
    /// NOTE: This function will currently return only the number of hops
    /// _currently_ in the circuit. If there is an extend operation in progress,
    /// the currently pending hop may or may not be counted, depending on whether
    /// the extend operation finishes before this call is done.
80
    pub fn n_hops(&self) -> Result<usize> {
80
        self.mutable
80
            .n_hops(self.unique_id)
80
            .map_err(|_| Error::CircuitClosed)
80
    }
    /// Return a future that will resolve once this circuit has closed.
    ///
    /// Note that this method does not itself cause the circuit to shut down.
    ///
    /// TODO: Perhaps this should return some kind of status indication instead
    /// of just ()
    #[cfg(feature = "experimental-api")]
    pub fn wait_for_close(&self) -> impl futures::Future<Output = ()> + Send + Sync + 'static {
        self.reactor_closed_rx.clone().map(|_| ())
    }
}
/// Handle to use during an ongoing protocol exchange with a circuit's last hop
///
/// This is obtained from [`ClientCirc::start_conversation`],
/// and used to send messages to the last hop relay.
//
// TODO(conflux): this should use ClientTunnel, and it should be moved into
// the tunnel module.
#[cfg(feature = "send-control-msg")]
#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
pub struct Conversation<'r>(&'r ClientCirc);
#[cfg(feature = "send-control-msg")]
#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
impl Conversation<'_> {
    /// Send a protocol message as part of an ad-hoc exchange
    ///
    /// Responses are handled by the `MsgHandler` set up
    /// when the `Conversation` was created.
    pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
        self.send_internal(Some(msg), None).await
    }
    /// Send a `SendMsgAndInstallHandler` to the reactor and wait for the outcome
    ///
    /// The guts of `start_conversation` and `Conversation::send_msg`
    pub(crate) async fn send_internal(
        &self,
        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
    ) -> Result<()> {
        let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
        let (sender, receiver) = oneshot::channel();
        let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
            msg,
            handler,
            sender,
        };
        self.0
            .control
            .unbounded_send(ctrl_msg)
            .map_err(|_| Error::CircuitClosed)?;
        receiver.await.map_err(|_| Error::CircuitClosed)?
    }
}
impl PendingClientCirc {
    /// Instantiate a new circuit object: used from Channel::new_circ().
    ///
    /// Does not send a CREATE* cell on its own.
    ///
    ///
256
    pub(crate) fn new(
256
        id: CircId,
256
        channel: Arc<Channel>,
256
        createdreceiver: oneshot::Receiver<CreateResponse>,
256
        input: CircuitRxReceiver,
256
        unique_id: UniqId,
256
        runtime: DynTimeProvider,
256
        memquota: CircuitAccount,
256
    ) -> (PendingClientCirc, crate::tunnel::reactor::Reactor) {
256
        let time_provider = channel.time_provider().clone();
256
        let (reactor, control_tx, command_tx, reactor_closed_rx, mutable) =
256
            Reactor::new(channel, id, unique_id, input, runtime, memquota.clone());
256

            
256
        let circuit = ClientCirc {
256
            mutable,
256
            unique_id,
256
            control: control_tx,
256
            command: command_tx,
256
            reactor_closed_rx: reactor_closed_rx.shared(),
256
            #[cfg(test)]
256
            circid: id,
256
            memquota,
256
            time_provider,
256
        };
256

            
256
        let pending = PendingClientCirc {
256
            recvcreated: createdreceiver,
256
            circ: Arc::new(circuit),
256
        };
256
        (pending, reactor)
256
    }
    /// Extract the process-unique identifier for this pending circuit.
    pub fn peek_unique_id(&self) -> UniqId {
        self.circ.unique_id
    }
    /// Use the (questionable!) CREATE_FAST handshake to connect to the
    /// first hop of this circuit.
    ///
    /// There's no authentication in CRATE_FAST,
    /// so we don't need to know whom we're connecting to: we're just
    /// connecting to whichever relay the channel is for.
12
    pub async fn create_firsthop_fast(self, params: CircParameters) -> Result<Arc<ClientCirc>> {
8
        // We no nothing about this relay, so we assume it supports no protocol capabilities at all.
8
        //
8
        // TODO: If we had a consensus, we could assume it supported all required-relay-protocols.
8
        let protocols = tor_protover::Protocols::new();
8
        let settings =
8
            HopSettings::from_params_and_caps(&params, &protocols)?.without_negotiation();
8

            
8
        let (tx, rx) = oneshot::channel();
8
        self.circ
8
            .control
8
            .unbounded_send(CtrlMsg::Create {
8
                recv_created: self.recvcreated,
8
                handshake: CircuitHandshake::CreateFast,
8
                settings,
8
                done: tx,
8
            })
8
            .map_err(|_| Error::CircuitClosed)?;
8
        rx.await.map_err(|_| Error::CircuitClosed)??;
8
        Ok(self.circ)
8
    }
    /// Use the most appropriate handshake to connect to the first hop of this circuit.
    ///
    /// Note that the provided 'target' must match the channel's target,
    /// or the handshake will fail.
    pub async fn create_firsthop<Tg>(
        self,
        target: &Tg,
        params: CircParameters,
    ) -> Result<Arc<ClientCirc>>
    where
        Tg: tor_linkspec::CircTarget,
    {
        // (See note in ClientCirc::extend.)
        if target
            .protovers()
            .supports_named_subver(named::RELAY_NTORV3)
        {
            self.create_firsthop_ntor_v3(target, params).await
        } else {
            self.create_firsthop_ntor(target, params).await
        }
    }
    /// Use the ntor handshake to connect to the first hop of this circuit.
    ///
    /// Note that the provided 'target' must match the channel's target,
    /// or the handshake will fail.
8
    pub async fn create_firsthop_ntor<Tg>(
8
        self,
8
        target: &Tg,
8
        params: CircParameters,
8
    ) -> Result<Arc<ClientCirc>>
8
    where
8
        Tg: tor_linkspec::CircTarget,
8
    {
8
        let (tx, rx) = oneshot::channel();
8
        let settings =
8
            HopSettings::from_params_and_caps(&params, target.protovers())?.without_negotiation();
8

            
8
        self.circ
8
            .control
8
            .unbounded_send(CtrlMsg::Create {
8
                recv_created: self.recvcreated,
8
                handshake: CircuitHandshake::Ntor {
8
                    public_key: NtorPublicKey {
8
                        id: *target
8
                            .rsa_identity()
8
                            .ok_or(Error::MissingId(RelayIdType::Rsa))?,
8
                        pk: *target.ntor_onion_key(),
8
                    },
8
                    ed_identity: *target
8
                        .ed_identity()
8
                        .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
                },
8
                settings,
8
                done: tx,
8
            })
8
            .map_err(|_| Error::CircuitClosed)?;
8
        rx.await.map_err(|_| Error::CircuitClosed)??;
8
        Ok(self.circ)
8
    }
    /// Use the ntor_v3 handshake to connect to the first hop of this circuit.
    ///
    /// Assumes that the target supports ntor_v3. The caller should verify
    /// this before calling this function, e.g. by validating that the target
    /// has advertised ["Relay=4"](https://spec.torproject.org/tor-spec/subprotocol-versioning.html#relay).
    ///
    /// Note that the provided 'target' must match the channel's target,
    /// or the handshake will fail.
16
    pub async fn create_firsthop_ntor_v3<Tg>(
16
        self,
16
        target: &Tg,
16
        params: CircParameters,
16
    ) -> Result<Arc<ClientCirc>>
16
    where
16
        Tg: tor_linkspec::CircTarget,
16
    {
16
        let settings = HopSettings::from_params_and_caps(&params, target.protovers())?;
16
        let (tx, rx) = oneshot::channel();
16

            
16
        self.circ
16
            .control
16
            .unbounded_send(CtrlMsg::Create {
16
                recv_created: self.recvcreated,
16
                handshake: CircuitHandshake::NtorV3 {
16
                    public_key: NtorV3PublicKey {
16
                        id: *target
16
                            .ed_identity()
16
                            .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
16
                        pk: *target.ntor_onion_key(),
16
                    },
16
                },
16
                settings,
16
                done: tx,
16
            })
16
            .map_err(|_| Error::CircuitClosed)?;
16
        rx.await.map_err(|_| Error::CircuitClosed)??;
16
        Ok(self.circ)
16
    }
}
/// Convert a [`ResolvedVal`] into a Result, based on whether or not
/// it represents an error.
fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
    match val {
        ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
        ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
        ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
        _ => Ok(val),
    }
}
#[cfg(test)]
pub(crate) mod test {
    // @@ begin test lint list maintained by maint/add_warning @@
    #![allow(clippy::bool_assert_comparison)]
    #![allow(clippy::clone_on_copy)]
    #![allow(clippy::dbg_macro)]
    #![allow(clippy::mixed_attributes_style)]
    #![allow(clippy::print_stderr)]
    #![allow(clippy::print_stdout)]
    #![allow(clippy::single_char_pattern)]
    #![allow(clippy::unwrap_used)]
    #![allow(clippy::unchecked_duration_subtraction)]
    #![allow(clippy::useless_vec)]
    #![allow(clippy::needless_pass_by_value)]
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
    use super::*;
    use crate::channel::OpenChanCellS2C;
    use crate::channel::{test::new_reactor, CodecError};
    use crate::congestion::test_utils::params::build_cc_vegas_params;
    use crate::crypto::cell::RelayCellBody;
    use crate::crypto::handshake::ntor_v3::NtorV3Server;
    #[cfg(feature = "hs-service")]
    use crate::stream::IncomingStreamRequestFilter;
    use chanmsg::{AnyChanMsg, Created2, CreatedFast};
    use futures::channel::mpsc::{Receiver, Sender};
    use futures::io::{AsyncReadExt, AsyncWriteExt};
    use futures::sink::SinkExt;
    use futures::stream::StreamExt;
    use futures::task::SpawnExt;
    use hex_literal::hex;
    use std::collections::{HashMap, VecDeque};
    use std::fmt::Debug;
    use std::time::Duration;
    use tor_basic_utils::test_rng::testing_rng;
    use tor_cell::chancell::{msg as chanmsg, AnyChanCell, BoxedCellBody, ChanCell, ChanCmd};
    use tor_cell::relaycell::extend::{self as extend_ext, CircRequestExt, CircResponseExt};
    use tor_cell::relaycell::msg::SendmeTag;
    use tor_cell::relaycell::{
        msg as relaymsg, AnyRelayMsgOuter, RelayCellFormat, RelayCmd, RelayMsg as _, StreamId,
    };
    use tor_linkspec::OwnedCircTarget;
    use tor_memquota::HasMemoryCost;
    use tor_rtcompat::Runtime;
    use tracing::trace;
    use tracing_test::traced_test;
    #[cfg(feature = "conflux")]
    use {
        crate::tunnel::reactor::ConfluxHandshakeResult,
        crate::util::err::ConfluxHandshakeError,
        std::result::Result as StdResult,
        tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
        tor_cell::relaycell::msg::ConfluxLink,
        tor_rtmock::MockRuntime,
    };
    impl PendingClientCirc {
        /// Testing only: Extract the circuit ID for this pending circuit.
        pub(crate) fn peek_circid(&self) -> CircId {
            self.circ.circid
        }
    }
    impl ClientCirc {
        /// Testing only: Extract the circuit ID of this circuit.
        pub(crate) fn peek_circid(&self) -> CircId {
            self.circid
        }
    }
    fn rmsg_to_ccmsg(id: Option<StreamId>, msg: relaymsg::AnyRelayMsg) -> ClientCircChanMsg {
        // TODO #1947: test other formats.
        let rfmt = RelayCellFormat::V0;
        let body: BoxedCellBody = AnyRelayMsgOuter::new(id, msg)
            .encode(rfmt, &mut testing_rng())
            .unwrap();
        let chanmsg = chanmsg::Relay::from(body);
        ClientCircChanMsg::Relay(chanmsg)
    }
    // Example relay IDs and keys
    const EXAMPLE_SK: [u8; 32] =
        hex!("7789d92a89711a7e2874c61ea495452cfd48627b3ca2ea9546aafa5bf7b55803");
    const EXAMPLE_PK: [u8; 32] =
        hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253");
    const EXAMPLE_ED_ID: [u8; 32] = [6; 32];
    const EXAMPLE_RSA_ID: [u8; 20] = [10; 20];
    /// Make an MPSC queue, of the type we use in Channels, but a fake one for testing
    #[cfg(test)]
    pub(crate) fn fake_mpsc<T: HasMemoryCost + Debug + Send>(
        buffer: usize,
    ) -> (StreamMpscSender<T>, StreamMpscReceiver<T>) {
        crate::fake_mpsc(buffer)
    }
    /// return an example OwnedCircTarget that can get used for an ntor handshake.
    fn example_target() -> OwnedCircTarget {
        let mut builder = OwnedCircTarget::builder();
        builder
            .chan_target()
            .ed_identity(EXAMPLE_ED_ID.into())
            .rsa_identity(EXAMPLE_RSA_ID.into());
        builder
            .ntor_onion_key(EXAMPLE_PK.into())
            .protocols("FlowCtrl=1-2".parse().unwrap())
            .build()
            .unwrap()
    }
    fn example_ntor_key() -> crate::crypto::handshake::ntor::NtorSecretKey {
        crate::crypto::handshake::ntor::NtorSecretKey::new(
            EXAMPLE_SK.into(),
            EXAMPLE_PK.into(),
            EXAMPLE_RSA_ID.into(),
        )
    }
    fn example_ntor_v3_key() -> crate::crypto::handshake::ntor_v3::NtorV3SecretKey {
        crate::crypto::handshake::ntor_v3::NtorV3SecretKey::new(
            EXAMPLE_SK.into(),
            EXAMPLE_PK.into(),
            EXAMPLE_ED_ID.into(),
        )
    }
    fn working_fake_channel<R: Runtime>(
        rt: &R,
    ) -> (
        Arc<Channel>,
        Receiver<AnyChanCell>,
        Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
    ) {
        let (channel, chan_reactor, rx, tx) = new_reactor(rt.clone());
        rt.spawn(async {
            let _ignore = chan_reactor.run().await;
        })
        .unwrap();
        (channel, rx, tx)
    }
    /// Which handshake type to use.
    #[derive(Copy, Clone)]
    enum HandshakeType {
        Fast,
        Ntor,
        NtorV3,
    }
    async fn test_create<R: Runtime>(rt: &R, handshake_type: HandshakeType, with_cc: bool) {
        // We want to try progressing from a pending circuit to a circuit
        // via a crate_fast handshake.
        use crate::crypto::handshake::{fast::CreateFastServer, ntor::NtorServer, ServerHandshake};
        let (chan, mut rx, _sink) = working_fake_channel(rt);
        let circid = CircId::new(128).unwrap();
        let (created_send, created_recv) = oneshot::channel();
        let (_circmsg_send, circmsg_recv) = fake_mpsc(64);
        let unique_id = UniqId::new(23, 17);
        let (pending, reactor) = PendingClientCirc::new(
            circid,
            chan,
            created_recv,
            circmsg_recv,
            unique_id,
            DynTimeProvider::new(rt.clone()),
            CircuitAccount::new_noop(),
        );
        rt.spawn(async {
            let _ignore = reactor.run().await;
        })
        .unwrap();
        // Future to pretend to be a relay on the other end of the circuit.
        let simulate_relay_fut = async move {
            let mut rng = testing_rng();
            let create_cell = rx.next().await.unwrap();
            assert_eq!(create_cell.circid(), Some(circid));
            let reply = match handshake_type {
                HandshakeType::Fast => {
                    let cf = match create_cell.msg() {
                        AnyChanMsg::CreateFast(cf) => cf,
                        other => panic!("{:?}", other),
                    };
                    let (_, rep) = CreateFastServer::server(
                        &mut rng,
                        &mut |_: &()| Some(()),
                        &[()],
                        cf.handshake(),
                    )
                    .unwrap();
                    CreateResponse::CreatedFast(CreatedFast::new(rep))
                }
                HandshakeType::Ntor => {
                    let c2 = match create_cell.msg() {
                        AnyChanMsg::Create2(c2) => c2,
                        other => panic!("{:?}", other),
                    };
                    let (_, rep) = NtorServer::server(
                        &mut rng,
                        &mut |_: &()| Some(()),
                        &[example_ntor_key()],
                        c2.body(),
                    )
                    .unwrap();
                    CreateResponse::Created2(Created2::new(rep))
                }
                HandshakeType::NtorV3 => {
                    let c2 = match create_cell.msg() {
                        AnyChanMsg::Create2(c2) => c2,
                        other => panic!("{:?}", other),
                    };
                    let mut reply_fn = if with_cc {
                        |client_exts: &[CircRequestExt]| {
                            let _ = client_exts
                                .iter()
                                .find(|e| matches!(e, CircRequestExt::CcRequest(_)))
                                .expect("Client failed to request CC");
                            // This needs to be aligned to test_utils params
                            // value due to validation that needs it in range.
                            Some(vec![CircResponseExt::CcResponse(
                                extend_ext::CcResponse::new(31),
                            )])
                        }
                    } else {
                        |_: &_| Some(vec![])
                    };
                    let (_, rep) = NtorV3Server::server(
                        &mut rng,
                        &mut reply_fn,
                        &[example_ntor_v3_key()],
                        c2.body(),
                    )
                    .unwrap();
                    CreateResponse::Created2(Created2::new(rep))
                }
            };
            created_send.send(reply).unwrap();
        };
        // Future to pretend to be a client.
        let client_fut = async move {
            let target = example_target();
            let params = CircParameters::default();
            let ret = match handshake_type {
                HandshakeType::Fast => {
                    trace!("doing fast create");
                    pending.create_firsthop_fast(params).await
                }
                HandshakeType::Ntor => {
                    trace!("doing ntor create");
                    pending.create_firsthop_ntor(&target, params).await
                }
                HandshakeType::NtorV3 => {
                    let params = if with_cc {
                        // Setup CC vegas parameters.
                        CircParameters::new(true, build_cc_vegas_params())
                    } else {
                        params
                    };
                    trace!("doing ntor_v3 create");
                    pending.create_firsthop_ntor_v3(&target, params).await
                }
            };
            trace!("create done: result {:?}", ret);
            ret
        };
        let (circ, _) = futures::join!(client_fut, simulate_relay_fut);
        let _circ = circ.unwrap();
        // pfew!  We've build a circuit!  Let's make sure it has one hop.
        assert_eq!(_circ.n_hops().unwrap(), 1);
    }
    #[traced_test]
    #[test]
    fn test_create_fast() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_create(&rt, HandshakeType::Fast, false).await;
        });
    }
    #[traced_test]
    #[test]
    fn test_create_ntor() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_create(&rt, HandshakeType::Ntor, false).await;
        });
    }
    #[traced_test]
    #[test]
    fn test_create_ntor_v3() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_create(&rt, HandshakeType::NtorV3, false).await;
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "flowctl-cc")]
    fn test_create_ntor_v3_with_cc() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_create(&rt, HandshakeType::NtorV3, true).await;
        });
    }
    // An encryption layer that doesn't do any crypto.   Can be used
    // as inbound or outbound, but not both at once.
    pub(crate) struct DummyCrypto {
        counter_tag: [u8; 20],
        counter: u32,
        lasthop: bool,
    }
    impl DummyCrypto {
        fn next_tag(&mut self) -> SendmeTag {
            #![allow(clippy::identity_op)]
            self.counter_tag[0] = ((self.counter >> 0) & 255) as u8;
            self.counter_tag[1] = ((self.counter >> 8) & 255) as u8;
            self.counter_tag[2] = ((self.counter >> 16) & 255) as u8;
            self.counter_tag[3] = ((self.counter >> 24) & 255) as u8;
            self.counter += 1;
            self.counter_tag.into()
        }
    }
    impl crate::crypto::cell::OutboundClientLayer for DummyCrypto {
        fn originate_for(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
            self.next_tag()
        }
        fn encrypt_outbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
    }
    impl crate::crypto::cell::InboundClientLayer for DummyCrypto {
        fn decrypt_inbound(
            &mut self,
            _cmd: ChanCmd,
            _cell: &mut RelayCellBody,
        ) -> Option<SendmeTag> {
            if self.lasthop {
                Some(self.next_tag())
            } else {
                None
            }
        }
    }
    impl DummyCrypto {
        pub(crate) fn new(lasthop: bool) -> Self {
            DummyCrypto {
                counter_tag: [0; 20],
                counter: 0,
                lasthop,
            }
        }
    }
    // Helper: set up a 3-hop circuit with no encryption, where the
    // next inbound message seems to come from hop next_msg_from
    async fn newcirc_ext<R: Runtime>(
        rt: &R,
        unique_id: UniqId,
        chan: Arc<Channel>,
        hops: Vec<path::HopDetail>,
        next_msg_from: HopNum,
        params: CircParameters,
    ) -> (Arc<ClientCirc>, CircuitRxSender) {
        let circid = CircId::new(128).unwrap();
        let (_created_send, created_recv) = oneshot::channel();
        let (circmsg_send, circmsg_recv) = fake_mpsc(64);
        let (pending, reactor) = PendingClientCirc::new(
            circid,
            chan,
            created_recv,
            circmsg_recv,
            unique_id,
            DynTimeProvider::new(rt.clone()),
            CircuitAccount::new_noop(),
        );
        rt.spawn(async {
            let _ignore = reactor.run().await;
        })
        .unwrap();
        let PendingClientCirc {
            circ,
            recvcreated: _,
        } = pending;
        // TODO #1067: Support other formats
        let relay_cell_format = RelayCellFormat::V0;
        let last_hop_num = u8::try_from(hops.len() - 1).unwrap();
        for (idx, peer_id) in hops.into_iter().enumerate() {
            let (tx, rx) = oneshot::channel();
            let idx = idx as u8;
            circ.command
                .unbounded_send(CtrlCmd::AddFakeHop {
                    relay_cell_format,
                    fwd_lasthop: idx == last_hop_num,
                    rev_lasthop: idx == u8::from(next_msg_from),
                    peer_id,
                    params: params.clone(),
                    done: tx,
                })
                .unwrap();
            rx.await.unwrap().unwrap();
        }
        (circ, circmsg_send)
    }
    // Helper: set up a 3-hop circuit with no encryption, where the
    // next inbound message seems to come from hop next_msg_from
    async fn newcirc<R: Runtime>(rt: &R, chan: Arc<Channel>) -> (Arc<ClientCirc>, CircuitRxSender) {
        let hops = std::iter::repeat_with(|| {
            let peer_id = tor_linkspec::OwnedChanTarget::builder()
                .ed_identity([4; 32].into())
                .rsa_identity([5; 20].into())
                .build()
                .expect("Could not construct fake hop");
            path::HopDetail::Relay(peer_id)
        })
        .take(3)
        .collect();
        let unique_id = UniqId::new(23, 17);
        newcirc_ext(
            rt,
            unique_id,
            chan,
            hops,
            2.into(),
            CircParameters::default(),
        )
        .await
    }
    /// Create `n` distinct [`path::HopDetail`]s,
    /// with the specified `start_idx` for the dummy identities.
    fn hop_details(n: u8, start_idx: u8) -> Vec<path::HopDetail> {
        (0..n)
            .map(|idx| {
                let peer_id = tor_linkspec::OwnedChanTarget::builder()
                    .ed_identity([idx + start_idx; 32].into())
                    .rsa_identity([idx + start_idx + 1; 20].into())
                    .build()
                    .expect("Could not construct fake hop");
                path::HopDetail::Relay(peer_id)
            })
            .collect()
    }
    async fn test_extend<R: Runtime>(rt: &R, handshake_type: HandshakeType) {
        use crate::crypto::handshake::{ntor::NtorServer, ServerHandshake};
        let (chan, mut rx, _sink) = working_fake_channel(rt);
        let (circ, mut sink) = newcirc(rt, chan).await;
        let circid = circ.peek_circid();
        let params = CircParameters::default();
        let extend_fut = async move {
            let target = example_target();
            match handshake_type {
                HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
                HandshakeType::Ntor => circ.extend_ntor(&target, params).await.unwrap(),
                HandshakeType::NtorV3 => circ.extend_ntor_v3(&target, params).await.unwrap(),
            };
            circ // gotta keep the circ alive, or the reactor would exit.
        };
        let reply_fut = async move {
            // We've disabled encryption on this circuit, so we can just
            // read the extend2 cell.
            let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
            assert_eq!(id, Some(circid));
            let rmsg = match chmsg {
                AnyChanMsg::RelayEarly(r) => {
                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                        .unwrap()
                }
                other => panic!("{:?}", other),
            };
            let e2 = match rmsg.msg() {
                AnyRelayMsg::Extend2(e2) => e2,
                other => panic!("{:?}", other),
            };
            let mut rng = testing_rng();
            let reply = match handshake_type {
                HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
                HandshakeType::Ntor => {
                    let (_keygen, reply) = NtorServer::server(
                        &mut rng,
                        &mut |_: &()| Some(()),
                        &[example_ntor_key()],
                        e2.handshake(),
                    )
                    .unwrap();
                    reply
                }
                HandshakeType::NtorV3 => {
                    let (_keygen, reply) = NtorV3Server::server(
                        &mut rng,
                        &mut |_: &[CircRequestExt]| Some(vec![]),
                        &[example_ntor_v3_key()],
                        e2.handshake(),
                    )
                    .unwrap();
                    reply
                }
            };
            let extended2 = relaymsg::Extended2::new(reply).into();
            sink.send(rmsg_to_ccmsg(None, extended2)).await.unwrap();
            (sink, rx) // gotta keep the sink and receiver alive, or the reactor will exit.
        };
        let (circ, (_sink, _rx)) = futures::join!(extend_fut, reply_fut);
        // Did we really add another hop?
        assert_eq!(circ.n_hops().unwrap(), 4);
        // Do the path accessors report a reasonable outcome?
        {
            let path = circ.path_ref().unwrap();
            let path = path
                .all_hops()
                .filter_map(|hop| match hop {
                    path::HopDetail::Relay(r) => Some(r),
                    #[cfg(feature = "hs-common")]
                    path::HopDetail::Virtual => None,
                })
                .collect::<Vec<_>>();
            assert_eq!(path.len(), 4);
            use tor_linkspec::HasRelayIds;
            assert_eq!(path[3].ed_identity(), example_target().ed_identity());
            assert_ne!(path[0].ed_identity(), example_target().ed_identity());
        }
        {
            let path = circ.path_ref().unwrap();
            assert_eq!(path.n_hops(), 4);
            use tor_linkspec::HasRelayIds;
            assert_eq!(
                path.hops()[3].as_chan_target().unwrap().ed_identity(),
                example_target().ed_identity()
            );
            assert_ne!(
                path.hops()[0].as_chan_target().unwrap().ed_identity(),
                example_target().ed_identity()
            );
        }
    }
    #[traced_test]
    #[test]
    fn test_extend_ntor() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_extend(&rt, HandshakeType::Ntor).await;
        });
    }
    #[traced_test]
    #[test]
    fn test_extend_ntor_v3() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_extend(&rt, HandshakeType::NtorV3).await;
        });
    }
    async fn bad_extend_test_impl<R: Runtime>(
        rt: &R,
        reply_hop: HopNum,
        bad_reply: ClientCircChanMsg,
    ) -> Error {
        let (chan, _rx, _sink) = working_fake_channel(rt);
        let hops = std::iter::repeat_with(|| {
            let peer_id = tor_linkspec::OwnedChanTarget::builder()
                .ed_identity([4; 32].into())
                .rsa_identity([5; 20].into())
                .build()
                .expect("Could not construct fake hop");
            path::HopDetail::Relay(peer_id)
        })
        .take(3)
        .collect();
        let unique_id = UniqId::new(23, 17);
        let (circ, mut sink) = newcirc_ext(
            rt,
            unique_id,
            chan,
            hops,
            reply_hop,
            CircParameters::default(),
        )
        .await;
        let params = CircParameters::default();
        let target = example_target();
        #[allow(clippy::clone_on_copy)]
        let rtc = rt.clone();
        let sink_handle = rt
            .spawn_with_handle(async move {
                rtc.sleep(Duration::from_millis(100)).await;
                sink.send(bad_reply).await.unwrap();
                sink
            })
            .unwrap();
        let outcome = circ.extend_ntor(&target, params).await;
        let _sink = sink_handle.await;
        assert_eq!(circ.n_hops().unwrap(), 3);
        assert!(outcome.is_err());
        outcome.unwrap_err()
    }
    #[traced_test]
    #[test]
    fn bad_extend_wronghop() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let extended2 = relaymsg::Extended2::new(vec![]).into();
            let cc = rmsg_to_ccmsg(None, extended2);
            let error = bad_extend_test_impl(&rt, 1.into(), cc).await;
            // This case shows up as a CircDestroy, since a message sent
            // from the wrong hop won't even be delivered to the extend
            // code's meta-handler.  Instead the unexpected message will cause
            // the circuit to get torn down.
            match error {
                Error::CircuitClosed => {}
                x => panic!("got other error: {}", x),
            }
        });
    }
    #[traced_test]
    #[test]
    fn bad_extend_wrongtype() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let extended = relaymsg::Extended::new(vec![7; 200]).into();
            let cc = rmsg_to_ccmsg(None, extended);
            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
            match error {
                Error::BytesErr {
                    err: tor_bytes::Error::InvalidMessage(_),
                    object: "extended2 message",
                } => {}
                other => panic!("{:?}", other),
            }
        });
    }
    #[traced_test]
    #[test]
    fn bad_extend_destroy() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let cc = ClientCircChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
            match error {
                Error::CircuitClosed => {}
                other => panic!("{:?}", other),
            }
        });
    }
    #[traced_test]
    #[test]
    fn bad_extend_crypto() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
            let cc = rmsg_to_ccmsg(None, extended2);
            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
            assert!(matches!(error, Error::BadCircHandshakeAuth));
        });
    }
    #[traced_test]
    #[test]
    fn begindir() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let (chan, mut rx, _sink) = working_fake_channel(&rt);
            let (circ, mut sink) = newcirc(&rt, chan).await;
            let circid = circ.peek_circid();
            let begin_and_send_fut = async move {
                // Here we'll say we've got a circuit, and we want to
                // make a simple BEGINDIR request with it.
                let mut stream = circ.begin_dir_stream().await.unwrap();
                stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
                stream.flush().await.unwrap();
                let mut buf = [0_u8; 1024];
                let n = stream.read(&mut buf).await.unwrap();
                assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
                let n = stream.read(&mut buf).await.unwrap();
                assert_eq!(n, 0);
                stream
            };
            let reply_fut = async move {
                // We've disabled encryption on this circuit, so we can just
                // read the begindir cell.
                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
                assert_eq!(id, Some(circid));
                let rmsg = match chmsg {
                    AnyChanMsg::Relay(r) => {
                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                            .unwrap()
                    }
                    other => panic!("{:?}", other),
                };
                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
                assert!(matches!(rmsg, AnyRelayMsg::BeginDir(_)));
                // Reply with a Connected cell to indicate success.
                let connected = relaymsg::Connected::new_empty().into();
                sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
                // Now read a DATA cell...
                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
                assert_eq!(id, Some(circid));
                let rmsg = match chmsg {
                    AnyChanMsg::Relay(r) => {
                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                            .unwrap()
                    }
                    other => panic!("{:?}", other),
                };
                let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
                assert_eq!(streamid_2, streamid);
                if let AnyRelayMsg::Data(d) = rmsg {
                    assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
                } else {
                    panic!();
                }
                // Write another data cell in reply!
                let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
                    .unwrap()
                    .into();
                sink.send(rmsg_to_ccmsg(streamid, data)).await.unwrap();
                // Send an END cell to say that the conversation is over.
                let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
                sink.send(rmsg_to_ccmsg(streamid, end)).await.unwrap();
                (rx, sink) // gotta keep these alive, or the reactor will exit.
            };
            let (_stream, (_rx, _sink)) = futures::join!(begin_and_send_fut, reply_fut);
        });
    }
    // Test: close a stream, either by dropping it or by calling AsyncWriteExt::close.
    fn close_stream_helper(by_drop: bool) {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let (chan, mut rx, _sink) = working_fake_channel(&rt);
            let (circ, mut sink) = newcirc(&rt, chan).await;
            let stream_fut = async move {
                let stream = circ
                    .begin_stream("www.example.com", 80, None)
                    .await
                    .unwrap();
                let (r, mut w) = stream.split();
                if by_drop {
                    // Drop the writer and the reader, which should close the stream.
                    drop(r);
                    drop(w);
                    (None, circ) // make sure to keep the circuit alive
                } else {
                    // Call close on the writer, while keeping the reader alive.
                    w.close().await.unwrap();
                    (Some(r), circ)
                }
            };
            let handler_fut = async {
                // Read the BEGIN message.
                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
                let rmsg = match msg {
                    AnyChanMsg::Relay(r) => {
                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                            .unwrap()
                    }
                    other => panic!("{:?}", other),
                };
                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
                assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
                // Reply with a CONNECTED.
                let connected =
                    relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
                sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
                // Expect an END.
                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
                let rmsg = match msg {
                    AnyChanMsg::Relay(r) => {
                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                            .unwrap()
                    }
                    other => panic!("{:?}", other),
                };
                let (_, rmsg) = rmsg.into_streamid_and_msg();
                assert_eq!(rmsg.cmd(), RelayCmd::END);
                (rx, sink) // keep these alive or the reactor will exit.
            };
            let ((_opt_reader, _circ), (_rx, _sink)) = futures::join!(stream_fut, handler_fut);
        });
    }
    #[traced_test]
    #[test]
    fn drop_stream() {
        close_stream_helper(true);
    }
    #[traced_test]
    #[test]
    fn close_stream() {
        close_stream_helper(false);
    }
    // Set up a circuit and stream that expects some incoming SENDMEs.
    async fn setup_incoming_sendme_case<R: Runtime>(
        rt: &R,
        n_to_send: usize,
    ) -> (
        Arc<ClientCirc>,
        DataStream,
        CircuitRxSender,
        Option<StreamId>,
        usize,
        Receiver<AnyChanCell>,
        Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
    ) {
        let (chan, mut rx, sink2) = working_fake_channel(rt);
        let (circ, mut sink) = newcirc(rt, chan).await;
        let circid = circ.peek_circid();
        let begin_and_send_fut = {
            let circ = circ.clone();
            async move {
                // Take our circuit and make a stream on it.
                let mut stream = circ
                    .begin_stream("www.example.com", 443, None)
                    .await
                    .unwrap();
                let junk = [0_u8; 1024];
                let mut remaining = n_to_send;
                while remaining > 0 {
                    let n = std::cmp::min(remaining, junk.len());
                    stream.write_all(&junk[..n]).await.unwrap();
                    remaining -= n;
                }
                stream.flush().await.unwrap();
                stream
            }
        };
        let receive_fut = async move {
            // Read the begin cell.
            let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
            let rmsg = match chmsg {
                AnyChanMsg::Relay(r) => {
                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                        .unwrap()
                }
                other => panic!("{:?}", other),
            };
            let (streamid, rmsg) = rmsg.into_streamid_and_msg();
            assert!(matches!(rmsg, AnyRelayMsg::Begin(_)));
            // Reply with a connected cell...
            let connected = relaymsg::Connected::new_empty().into();
            sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
            // Now read bytes from the stream until we have them all.
            let mut bytes_received = 0_usize;
            let mut cells_received = 0_usize;
            while bytes_received < n_to_send {
                // Read a data cell, and remember how much we got.
                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
                assert_eq!(id, Some(circid));
                let rmsg = match chmsg {
                    AnyChanMsg::Relay(r) => {
                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                            .unwrap()
                    }
                    other => panic!("{:?}", other),
                };
                let (streamid2, rmsg) = rmsg.into_streamid_and_msg();
                assert_eq!(streamid2, streamid);
                if let AnyRelayMsg::Data(dat) = rmsg {
                    cells_received += 1;
                    bytes_received += dat.as_ref().len();
                } else {
                    panic!();
                }
            }
            (sink, streamid, cells_received, rx)
        };
        let (stream, (sink, streamid, cells_received, rx)) =
            futures::join!(begin_and_send_fut, receive_fut);
        (circ, stream, sink, streamid, cells_received, rx, sink2)
    }
    #[traced_test]
    #[test]
    fn accept_valid_sendme() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let (circ, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
            assert_eq!(cells_received, 301);
            // Make sure that the circuit is indeed expecting the right sendmes
            {
                let (tx, rx) = oneshot::channel();
                circ.command
                    .unbounded_send(CtrlCmd::QuerySendWindow {
                        hop: 2.into(),
                        done: tx,
                    })
                    .unwrap();
                let (window, tags) = rx.await.unwrap().unwrap();
                assert_eq!(window, 1000 - 301);
                assert_eq!(tags.len(), 3);
                // 100
                assert_eq!(
                    tags[0],
                    SendmeTag::from(hex!("6400000000000000000000000000000000000000"))
                );
                // 200
                assert_eq!(
                    tags[1],
                    SendmeTag::from(hex!("c800000000000000000000000000000000000000"))
                );
                // 300
                assert_eq!(
                    tags[2],
                    SendmeTag::from(hex!("2c01000000000000000000000000000000000000"))
                );
            }
            let reply_with_sendme_fut = async move {
                // make and send a circuit-level sendme.
                let c_sendme =
                    relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000"))
                        .into();
                sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
                // Make and send a stream-level sendme.
                let s_sendme = relaymsg::Sendme::new_empty().into();
                sink.send(rmsg_to_ccmsg(streamid, s_sendme)).await.unwrap();
                sink
            };
            let _sink = reply_with_sendme_fut.await;
            rt.advance_until_stalled().await;
            // Now make sure that the circuit is still happy, and its
            // window is updated.
            {
                let (tx, rx) = oneshot::channel();
                circ.command
                    .unbounded_send(CtrlCmd::QuerySendWindow {
                        hop: 2.into(),
                        done: tx,
                    })
                    .unwrap();
                let (window, _tags) = rx.await.unwrap().unwrap();
                assert_eq!(window, 1000 - 201);
            }
        });
    }
    #[traced_test]
    #[test]
    fn invalid_circ_sendme() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            // Same setup as accept_valid_sendme() test above but try giving
            // a sendme with the wrong tag.
            let (circ, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
            let reply_with_sendme_fut = async move {
                // make and send a circuit-level sendme with a bad tag.
                let c_sendme =
                    relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF"))
                        .into();
                sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
                sink
            };
            let _sink = reply_with_sendme_fut.await;
            // Check whether the reactor dies as a result of receiving invalid data.
            rt.advance_until_stalled().await;
            assert!(circ.is_closing());
        });
    }
    #[traced_test]
    #[test]
    fn test_busy_stream_fairness() {
        // Number of streams to use.
        const N_STREAMS: usize = 3;
        // Number of cells (roughly) for each stream to send.
        const N_CELLS: usize = 20;
        // Number of bytes that *each* stream will send, and that we'll read
        // from the channel.
        const N_BYTES: usize = relaymsg::Data::MAXLEN_V0 * N_CELLS;
        // Ignoring cell granularity, with perfect fairness we'd expect
        // `N_BYTES/N_STREAMS` bytes from each stream.
        //
        // We currently allow for up to a full cell less than that.  This is
        // somewhat arbitrary and can be changed as needed, since we don't
        // provide any specific fairness guarantees.
        const MIN_EXPECTED_BYTES_PER_STREAM: usize =
            N_BYTES / N_STREAMS - relaymsg::Data::MAXLEN_V0;
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let (chan, mut rx, _sink) = working_fake_channel(&rt);
            let (circ, mut sink) = newcirc(&rt, chan).await;
            // Run clients in a single task, doing our own round-robin
            // scheduling of writes to the reactor. Conversely, if we were to
            // put each client in its own task, we would be at the the mercy of
            // how fairly the runtime schedules the client tasks, which is outside
            // the scope of this test.
            rt.spawn({
                // Clone the circuit to keep it alive after writers have
                // finished with it.
                let circ = circ.clone();
                async move {
                    let mut clients = VecDeque::new();
                    struct Client {
                        stream: DataStream,
                        to_write: &'static [u8],
                    }
                    for _ in 0..N_STREAMS {
                        clients.push_back(Client {
                            stream: circ
                                .begin_stream("www.example.com", 80, None)
                                .await
                                .unwrap(),
                            to_write: &[0_u8; N_BYTES][..],
                        });
                    }
                    while let Some(mut client) = clients.pop_front() {
                        if client.to_write.is_empty() {
                            // Client is done. Don't put back in queue.
                            continue;
                        }
                        let written = client.stream.write(client.to_write).await.unwrap();
                        client.to_write = &client.to_write[written..];
                        clients.push_back(client);
                    }
                }
            })
            .unwrap();
            let channel_handler_fut = async {
                let mut stream_bytes_received = HashMap::<StreamId, usize>::new();
                let mut total_bytes_received = 0;
                loop {
                    let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
                    let rmsg = match msg {
                        AnyChanMsg::Relay(r) => AnyRelayMsgOuter::decode_singleton(
                            RelayCellFormat::V0,
                            r.into_relay_body(),
                        )
                        .unwrap(),
                        other => panic!("Unexpected chanmsg: {other:?}"),
                    };
                    let (streamid, rmsg) = rmsg.into_streamid_and_msg();
                    match rmsg.cmd() {
                        RelayCmd::BEGIN => {
                            // Add an entry for this stream.
                            let prev = stream_bytes_received.insert(streamid.unwrap(), 0);
                            assert_eq!(prev, None);
                            // Reply with a CONNECTED.
                            let connected = relaymsg::Connected::new_with_addr(
                                "10.0.0.1".parse().unwrap(),
                                1234,
                            )
                            .into();
                            sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
                        }
                        RelayCmd::DATA => {
                            let data_msg = relaymsg::Data::try_from(rmsg).unwrap();
                            let nbytes = data_msg.as_ref().len();
                            total_bytes_received += nbytes;
                            let streamid = streamid.unwrap();
                            let stream_bytes = stream_bytes_received.get_mut(&streamid).unwrap();
                            *stream_bytes += nbytes;
                            if total_bytes_received >= N_BYTES {
                                break;
                            }
                        }
                        RelayCmd::END => {
                            // Stream is done. If fair scheduling is working as
                            // expected we *probably* shouldn't get here, but we
                            // can ignore it and save the failure until we
                            // actually have the final stats.
                            continue;
                        }
                        other => {
                            panic!("Unexpected command {other:?}");
                        }
                    }
                }
                // Return our stats, along with the `rx` and `sink` to keep the
                // reactor alive (since clients could still be writing).
                (total_bytes_received, stream_bytes_received, rx, sink)
            };
            let (total_bytes_received, stream_bytes_received, _rx, _sink) =
                channel_handler_fut.await;
            assert_eq!(stream_bytes_received.len(), N_STREAMS);
            for (sid, stream_bytes) in stream_bytes_received {
                assert!(
                    stream_bytes >= MIN_EXPECTED_BYTES_PER_STREAM,
                    "Only {stream_bytes} of {total_bytes_received} bytes received from {N_STREAMS} came from {sid:?}; expected at least {MIN_EXPECTED_BYTES_PER_STREAM}"
                );
            }
        });
    }
    #[test]
    fn basic_params() {
        use super::CircParameters;
        let mut p = CircParameters::default();
        assert!(p.extend_by_ed25519_id);
        p.extend_by_ed25519_id = false;
        assert!(!p.extend_by_ed25519_id);
    }
    #[cfg(feature = "hs-service")]
    struct AllowAllStreamsFilter;
    #[cfg(feature = "hs-service")]
    impl IncomingStreamRequestFilter for AllowAllStreamsFilter {
        fn disposition(
            &mut self,
            _ctx: &crate::stream::IncomingStreamRequestContext<'_>,
            _circ: &crate::tunnel::reactor::syncview::ClientCircSyncView<'_>,
        ) -> Result<crate::stream::IncomingStreamRequestDisposition> {
            Ok(crate::stream::IncomingStreamRequestDisposition::Accept)
        }
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "hs-service")]
    fn allow_stream_requests_twice() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let (chan, _rx, _sink) = working_fake_channel(&rt);
            let (circ, _send) = newcirc(&rt, chan).await;
            let _incoming = circ
                .allow_stream_requests(
                    &[tor_cell::relaycell::RelayCmd::BEGIN],
                    circ.last_hop_num().unwrap(),
                    AllowAllStreamsFilter,
                )
                .await
                .unwrap();
            let incoming = circ
                .allow_stream_requests(
                    &[tor_cell::relaycell::RelayCmd::BEGIN],
                    circ.last_hop_num().unwrap(),
                    AllowAllStreamsFilter,
                )
                .await;
            // There can only be one IncomingStream at a time on any given circuit.
            assert!(incoming.is_err());
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "hs-service")]
    fn allow_stream_requests() {
        use tor_cell::relaycell::msg::BeginFlags;
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            const TEST_DATA: &[u8] = b"ping";
            let (chan, _rx, _sink) = working_fake_channel(&rt);
            let (circ, mut send) = newcirc(&rt, chan).await;
            let rfmt = RelayCellFormat::V0;
            // A helper channel for coordinating the "client"/"service" interaction
            let (tx, rx) = oneshot::channel();
            let mut incoming = circ
                .allow_stream_requests(
                    &[tor_cell::relaycell::RelayCmd::BEGIN],
                    circ.last_hop_num().unwrap(),
                    AllowAllStreamsFilter,
                )
                .await
                .unwrap();
            let simulate_service = async move {
                let stream = incoming.next().await.unwrap();
                let mut data_stream = stream
                    .accept_data(relaymsg::Connected::new_empty())
                    .await
                    .unwrap();
                // Notify the client task we're ready to accept DATA cells
                tx.send(()).unwrap();
                // Read the data the client sent us
                let mut buf = [0_u8; TEST_DATA.len()];
                data_stream.read_exact(&mut buf).await.unwrap();
                assert_eq!(&buf, TEST_DATA);
                circ
            };
            let simulate_client = async move {
                let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
                let body: BoxedCellBody =
                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
                        .encode(rfmt, &mut testing_rng())
                        .unwrap();
                let begin_msg = chanmsg::Relay::from(body);
                // Pretend to be a client at the other end of the circuit sending a begin cell
                send.send(ClientCircChanMsg::Relay(begin_msg))
                    .await
                    .unwrap();
                // Wait until the service is ready to accept data
                // TODO: we shouldn't need to wait! This is needed because the service will reject
                // any DATA cells that aren't associated with a known stream. We need to wait until
                // the service receives our BEGIN cell (and the reactor updates hop.map with the
                // new stream).
                rx.await.unwrap();
                // Now send some data along the newly established circuit..
                let data = relaymsg::Data::new(TEST_DATA).unwrap();
                let body: BoxedCellBody =
                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
                        .encode(rfmt, &mut testing_rng())
                        .unwrap();
                let data_msg = chanmsg::Relay::from(body);
                send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
                send
            };
            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "hs-service")]
    fn accept_stream_after_reject() {
        use tor_cell::relaycell::msg::BeginFlags;
        use tor_cell::relaycell::msg::EndReason;
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            const TEST_DATA: &[u8] = b"ping";
            const STREAM_COUNT: usize = 2;
            let rfmt = RelayCellFormat::V0;
            let (chan, _rx, _sink) = working_fake_channel(&rt);
            let (circ, mut send) = newcirc(&rt, chan).await;
            // A helper channel for coordinating the "client"/"service" interaction
            let (mut tx, mut rx) = mpsc::channel(STREAM_COUNT);
            let mut incoming = circ
                .allow_stream_requests(
                    &[tor_cell::relaycell::RelayCmd::BEGIN],
                    circ.last_hop_num().unwrap(),
                    AllowAllStreamsFilter,
                )
                .await
                .unwrap();
            let simulate_service = async move {
                // Process 2 incoming streams
                for i in 0..STREAM_COUNT {
                    let stream = incoming.next().await.unwrap();
                    // Reject the first one
                    if i == 0 {
                        stream
                            .reject(relaymsg::End::new_with_reason(EndReason::INTERNAL))
                            .await
                            .unwrap();
                        // Notify the client
                        tx.send(()).await.unwrap();
                        continue;
                    }
                    let mut data_stream = stream
                        .accept_data(relaymsg::Connected::new_empty())
                        .await
                        .unwrap();
                    // Notify the client task we're ready to accept DATA cells
                    tx.send(()).await.unwrap();
                    // Read the data the client sent us
                    let mut buf = [0_u8; TEST_DATA.len()];
                    data_stream.read_exact(&mut buf).await.unwrap();
                    assert_eq!(&buf, TEST_DATA);
                }
                circ
            };
            let simulate_client = async move {
                let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
                let body: BoxedCellBody =
                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
                        .encode(rfmt, &mut testing_rng())
                        .unwrap();
                let begin_msg = chanmsg::Relay::from(body);
                // Pretend to be a client at the other end of the circuit sending 2 identical begin
                // cells (the first one will be rejected by the test service).
                for _ in 0..STREAM_COUNT {
                    send.send(ClientCircChanMsg::Relay(begin_msg.clone()))
                        .await
                        .unwrap();
                    // Wait until the service rejects our request
                    rx.next().await.unwrap();
                }
                // Now send some data along the newly established circuit..
                let data = relaymsg::Data::new(TEST_DATA).unwrap();
                let body: BoxedCellBody =
                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
                        .encode(rfmt, &mut testing_rng())
                        .unwrap();
                let data_msg = chanmsg::Relay::from(body);
                send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
                send
            };
            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "hs-service")]
    fn incoming_stream_bad_hop() {
        use tor_cell::relaycell::msg::BeginFlags;
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            /// Expect the originator of the BEGIN cell to be hop 1.
            const EXPECTED_HOP: u8 = 1;
            let rfmt = RelayCellFormat::V0;
            let (chan, _rx, _sink) = working_fake_channel(&rt);
            let (circ, mut send) = newcirc(&rt, chan).await;
            // Expect to receive incoming streams from hop EXPECTED_HOP
            let mut incoming = circ
                .allow_stream_requests(
                    &[tor_cell::relaycell::RelayCmd::BEGIN],
                    EXPECTED_HOP.into(),
                    AllowAllStreamsFilter,
                )
                .await
                .unwrap();
            let simulate_service = async move {
                // The originator of the cell is actually the last hop on the circuit, not hop 1,
                // so we expect the reactor to shut down.
                assert!(incoming.next().await.is_none());
                circ
            };
            let simulate_client = async move {
                let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
                let body: BoxedCellBody =
                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
                        .encode(rfmt, &mut testing_rng())
                        .unwrap();
                let begin_msg = chanmsg::Relay::from(body);
                // Pretend to be a client at the other end of the circuit sending a begin cell
                send.send(ClientCircChanMsg::Relay(begin_msg))
                    .await
                    .unwrap();
                send
            };
            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn multipath_circ_validation() {
        use std::error::Error as _;
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let params = CircParameters::default();
            let invalid_tunnels = [
                setup_bad_conflux_tunnel(&rt).await,
                setup_conflux_tunnel(&rt, true, params).await,
            ];
            for tunnel in invalid_tunnels {
                let TestTunnelCtx {
                    tunnel: _tunnel,
                    circs: _circs,
                    conflux_link_rx,
                } = tunnel;
                let conflux_hs_err = conflux_link_rx.await.unwrap().unwrap_err();
                let err_src = conflux_hs_err.source().unwrap();
                // The two circuits don't end in the same hop (no join point),
                // so the reactor will refuse to link them
                assert!(err_src
                    .to_string()
                    .contains("one more more conflux circuits are invalid"));
            }
        });
    }
    // TODO: this structure could be reused for the other tests,
    // to address nickm's comment:
    // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3005#note_3202362
    #[derive(Debug)]
    #[allow(unused)]
    #[cfg(feature = "conflux")]
    struct TestCircuitCtx {
        chan_rx: Receiver<AnyChanCell>,
        chan_tx: Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
        circ_sink: CircuitRxSender,
    }
    #[derive(Debug)]
    #[cfg(feature = "conflux")]
    struct TestTunnelCtx {
        tunnel: Arc<ClientCirc>,
        circs: Vec<TestCircuitCtx>,
        conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
    }
    /// Wait for a LINK cell to arrive on the specified channel and return its payload.
    #[cfg(feature = "conflux")]
    async fn await_link_payload(rx: &mut Receiver<AnyChanCell>) -> ConfluxLink {
        // Wait for the LINK cell...
        let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
        let rmsg = match chmsg {
            AnyChanMsg::Relay(r) => {
                AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                    .unwrap()
            }
            other => panic!("{:?}", other),
        };
        let (streamid, rmsg) = rmsg.into_streamid_and_msg();
        let link = match rmsg {
            AnyRelayMsg::ConfluxLink(link) => link,
            _ => panic!("unexpected relay message {rmsg:?}"),
        };
        assert!(streamid.is_none());
        link
    }
    #[cfg(feature = "conflux")]
    async fn setup_conflux_tunnel(
        rt: &MockRuntime,
        same_hops: bool,
        params: CircParameters,
    ) -> TestTunnelCtx {
        let hops1 = hop_details(3, 0);
        let hops2 = if same_hops {
            hops1.clone()
        } else {
            hop_details(3, 10)
        };
        let (chan1, rx1, chan_sink1) = working_fake_channel(rt);
        let (circ1, sink1) = newcirc_ext(
            rt,
            UniqId::new(1, 3),
            chan1,
            hops1,
            2.into(),
            params.clone(),
        )
        .await;
        let (chan2, rx2, chan_sink2) = working_fake_channel(rt);
        let (circ2, sink2) =
            newcirc_ext(rt, UniqId::new(2, 4), chan2, hops2, 2.into(), params).await;
        let (answer_tx, answer_rx) = oneshot::channel();
        circ2
            .command
            .unbounded_send(CtrlCmd::ShutdownAndReturnCircuit { answer: answer_tx })
            .unwrap();
        let circuit = answer_rx.await.unwrap().unwrap();
        // The circuit should be shutting down its reactor
        rt.advance_until_stalled().await;
        assert!(circ2.is_closing());
        let (conflux_link_tx, conflux_link_rx) = oneshot::channel();
        // Tell the first circuit to link with the second and form a multipath tunnel
        circ1
            .control
            .unbounded_send(CtrlMsg::LinkCircuits {
                circuits: vec![circuit],
                answer: conflux_link_tx,
            })
            .unwrap();
        let circ_ctx1 = TestCircuitCtx {
            chan_rx: rx1,
            chan_tx: chan_sink1,
            circ_sink: sink1,
        };
        let circ_ctx2 = TestCircuitCtx {
            chan_rx: rx2,
            chan_tx: chan_sink2,
            circ_sink: sink2,
        };
        TestTunnelCtx {
            tunnel: circ1,
            circs: vec![circ_ctx1, circ_ctx2],
            conflux_link_rx,
        }
    }
    #[cfg(feature = "conflux")]
    async fn setup_good_conflux_tunnel(rt: &MockRuntime) -> TestTunnelCtx {
        // Our 2 test circuits are identical, so they both have the same guards,
        // which technically violates the conflux set rule mentioned in prop354.
        // For testing purposes this is fine, but in production we'll need to ensure
        // the calling code prevents guard reuse (except in the case where
        // one of the guards happens to be Guard + Exit)
        let same_hops = true;
        let params = CircParameters::new(true, build_cc_vegas_params());
        setup_conflux_tunnel(rt, same_hops, params).await
    }
    #[cfg(feature = "conflux")]
    async fn setup_bad_conflux_tunnel(rt: &MockRuntime) -> TestTunnelCtx {
        // The two circuits don't share any hops,
        // so they won't end in the same hop (no join point),
        // causing the reactor to refuse to link them.
        let same_hops = false;
        let params = CircParameters::new(true, build_cc_vegas_params());
        setup_conflux_tunnel(rt, same_hops, params).await
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn reject_conflux_linked_before_hs() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let (chan, mut _rx, _sink) = working_fake_channel(&rt);
            let (circ, mut sink) = newcirc(&rt, chan).await;
            let nonce = V1Nonce::new(&mut testing_rng());
            let payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
            // Send a LINKED cell
            let linked = relaymsg::ConfluxLinked::new(payload).into();
            sink.send(rmsg_to_ccmsg(None, linked)).await.unwrap();
            rt.advance_until_stalled().await;
            assert!(circ.is_closing());
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn conflux_hs_timeout() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let TestTunnelCtx {
                tunnel: _tunnel,
                circs,
                conflux_link_rx,
            } = setup_good_conflux_tunnel(&rt).await;
            let [mut circ1, _circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
            // Wait for the LINK cell
            let link = await_link_payload(&mut circ1.chan_rx).await;
            // Send a LINK cell on the first leg...
            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
            circ1
                .circ_sink
                .send(rmsg_to_ccmsg(None, linked))
                .await
                .unwrap();
            // Do nothing, and wait for the handshake to timeout on the second leg
            rt.advance_by(Duration::from_secs(60)).await;
            let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
            // Get the handshake results of each circuit
            let [res1, res2]: [StdResult<(), ConfluxHandshakeError>; 2] =
                conflux_hs_res.try_into().unwrap();
            assert!(res1.is_ok());
            let err = res2.unwrap_err();
            assert!(matches!(err, ConfluxHandshakeError::Timeout), "{err:?}");
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn conflux_bad_hs() {
        use crate::util::err::ConfluxHandshakeError;
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let nonce = V1Nonce::new(&mut testing_rng());
            let bad_link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
            //let extended2 = relaymsg::Extended2::new(vec![]).into();
            let bad_hs_responses = [
                (
                    rmsg_to_ccmsg(
                        None,
                        relaymsg::ConfluxLinked::new(bad_link_payload.clone()).into(),
                    ),
                    "Received CONFLUX_LINKED cell with mismatched nonce",
                ),
                (
                    rmsg_to_ccmsg(None, relaymsg::ConfluxLink::new(bad_link_payload).into()),
                    "Unexpected CONFLUX_LINK cell from hop #3 on client circuit",
                ),
                (
                    rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into()),
                    "Received CONFLUX_SWITCH on unlinked circuit?!",
                ),
                // TODO: this currently causes the reactor to shut down immediately,
                // without sending a response on the handshake channel
                /*
                (
                    rmsg_to_ccmsg(None, extended2),
                    "Received CONFLUX_LINKED cell with mismatched nonce",
                ),
                */
            ];
            for (bad_cell, expected_err) in bad_hs_responses {
                let TestTunnelCtx {
                    tunnel,
                    circs,
                    conflux_link_rx,
                } = setup_good_conflux_tunnel(&rt).await;
                let [mut _circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
                // Respond with a bogus cell on one of the legs
                circ2.circ_sink.send(bad_cell).await.unwrap();
                let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
                // Get the handshake results (the handshake results are reported early,
                // without waiting for the second circuit leg's handshake to timeout,
                // because this is a protocol violation causing the entire tunnel to shut down)
                let [res2]: [StdResult<(), ConfluxHandshakeError>; 1] =
                    conflux_hs_res.try_into().unwrap();
                match res2.unwrap_err() {
                    ConfluxHandshakeError::Link(Error::CircProto(e)) => {
                        assert_eq!(e, expected_err);
                    }
                    e => panic!("unexpected error: {e:?}"),
                }
                assert!(tunnel.is_closing());
            }
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn unexpected_conflux_cell() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let nonce = V1Nonce::new(&mut testing_rng());
            let link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
            let bad_cells = [
                rmsg_to_ccmsg(
                    None,
                    relaymsg::ConfluxLinked::new(link_payload.clone()).into(),
                ),
                rmsg_to_ccmsg(
                    None,
                    relaymsg::ConfluxLink::new(link_payload.clone()).into(),
                ),
                rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into()),
            ];
            for bad_cell in bad_cells {
                let (chan, mut _rx, _sink) = working_fake_channel(&rt);
                let (circ, mut sink) = newcirc(&rt, chan).await;
                sink.send(bad_cell).await.unwrap();
                rt.advance_until_stalled().await;
                // Note: unfortunately we can't assert the circuit is
                // closing for the reason, because the reactor just logs
                // the error and then exits.
                assert!(circ.is_closing());
            }
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn conflux_bad_linked() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let TestTunnelCtx {
                tunnel,
                circs,
                conflux_link_rx: _,
            } = setup_good_conflux_tunnel(&rt).await;
            let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
            let link = await_link_payload(&mut circ1.chan_rx).await;
            // Send a LINK cell on the first leg...
            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
            circ1
                .circ_sink
                .send(rmsg_to_ccmsg(None, linked))
                .await
                .unwrap();
            // ...and two LINKED cells on the second
            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
            circ2
                .circ_sink
                .send(rmsg_to_ccmsg(None, linked))
                .await
                .unwrap();
            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
            circ2
                .circ_sink
                .send(rmsg_to_ccmsg(None, linked))
                .await
                .unwrap();
            rt.advance_until_stalled().await;
            // Receiving a LINKED cell on an already linked leg causes
            // the tunnel to be torn down
            assert!(tunnel.is_closing());
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn conflux_bad_switch() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let bad_switch = [
                // SWITCH cells with seqno = 0 are not allowed
                relaymsg::ConfluxSwitch::new(0),
                // TODO(#2031): from c-tor:
                //
                // We have to make sure that the switch command is truely
                // incrementing the sequence number, or else it becomes
                // a side channel that can be spammed for traffic analysis.
                //
                // We should figure out what this check is supposed to look like,
                // and have a test for it
            ];
            for bad_cell in bad_switch {
                let TestTunnelCtx {
                    tunnel,
                    circs,
                    conflux_link_rx,
                } = setup_good_conflux_tunnel(&rt).await;
                let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
                let link = await_link_payload(&mut circ1.chan_rx).await;
                // Send a LINKED cell on both legs
                for circ in [&mut circ1, &mut circ2] {
                    let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
                    circ.circ_sink
                        .send(rmsg_to_ccmsg(None, linked))
                        .await
                        .unwrap();
                }
                let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
                assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
                // Now send a bad SWITCH cell on *both* legs.
                // This will cause both legs to be removed from the conflux set,
                // which causes the tunnel reactor to shut down
                for circ in [&mut circ1, &mut circ2] {
                    let msg = rmsg_to_ccmsg(None, bad_cell.clone().into());
                    circ.circ_sink.send(msg).await.unwrap();
                }
                // The tunnel should be shutting down
                rt.advance_until_stalled().await;
                assert!(tunnel.is_closing());
            }
        });
    }
    // TODO(conflux): add a test for SWITCH handling
    /// Run a conflux test endpoint.
    #[cfg(feature = "conflux")]
    #[derive(Debug)]
    enum ConfluxTestEndpoint {
        /// Pretend to be an exit relay.
        Relay(ConfluxExitState),
        /// Client task.
        Client {
            /// Channel for receiving the outcome of the conflux handshakes.
            conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
            /// The tunnel reactor handle
            tunnel: Arc<ClientCirc>,
            /// Data to send on a stream.
            stream_data: Vec<u8>,
        },
    }
    /// Structure for returning the sinks, channels, etc. that must stay
    /// alive until the test is complete.
    #[allow(unused)]
    #[derive(Debug)]
    #[cfg(feature = "conflux")]
    enum ConfluxEndpointResult {
        Circuit(Arc<ClientCirc>),
        Relay {
            rx: Receiver<ChanCell<AnyChanMsg>>,
            sink: CircuitRxSender,
        },
    }
    /// Stream data, shared by all the mock exit endpoints.
    #[derive(Debug)]
    #[cfg(feature = "conflux")]
    struct ConfluxStreamState {
        data_recvd: Vec<u8>,
        expected_data_len: usize,
        begin_recvd: bool,
        end_recvd: bool,
    }
    #[derive(Debug)]
    #[cfg(feature = "conflux")]
    struct ConfluxExitState {
        runtime: MockRuntime,
        init_rtt_delay: Option<Duration>,
        rtt_delay: Option<Duration>,
        leg: usize,
        rx: Receiver<ChanCell<AnyChanMsg>>,
        sink: CircuitRxSender,
        stream_state: Arc<Mutex<ConfluxStreamState>>,
        expect_switch: Vec<usize>,
    }
    #[cfg(feature = "conflux")]
    async fn good_exit_handshake(
        runtime: &MockRuntime,
        init_rtt_delay: Option<Duration>,
        rx: &mut Receiver<ChanCell<AnyChanMsg>>,
        sink: &mut CircuitRxSender,
    ) {
        // Wait for the LINK cell
        let link = await_link_payload(rx).await;
        // Introduce an artificial delay, to make one circ have a better initial RTT
        // than the other
        if let Some(init_rtt_delay) = init_rtt_delay {
            runtime.advance_by(init_rtt_delay).await;
        }
        // Reply with a LINKED cell...
        let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
        sink.send(rmsg_to_ccmsg(None, linked)).await.unwrap();
        // Wait for the client to respond with LINKED_ACK...
        let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
        let rmsg = match chmsg {
            AnyChanMsg::Relay(r) => {
                AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                    .unwrap()
            }
            other => panic!("{other:?}"),
        };
        let (_streamid, rmsg) = rmsg.into_streamid_and_msg();
        assert!(matches!(rmsg, AnyRelayMsg::ConfluxLinkedAck(_)));
    }
    #[cfg(feature = "conflux")]
    async fn run_mock_conflux_exit(state: ConfluxExitState) -> ConfluxEndpointResult {
        let ConfluxExitState {
            runtime,
            init_rtt_delay,
            rtt_delay,
            leg,
            mut rx,
            mut sink,
            stream_state,
            mut expect_switch,
        } = state;
        // Do the conflux handshake
        good_exit_handshake(&runtime, init_rtt_delay, &mut rx, &mut sink).await;
        // Expect the client to open a stream, and de-multiplex the received stream data
        let stream_len = stream_state.lock().unwrap().expected_data_len;
        let mut data_cells_received = 0_usize;
        let mut cell_count = 0_usize;
        while stream_state.lock().unwrap().data_recvd.len() < stream_len {
            // Wait for the BEGIN cell to arrive...
            let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
            cell_count += 1;
            let rmsg = match chmsg {
                AnyChanMsg::Relay(r) => {
                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                        .unwrap()
                }
                other => panic!("{:?}", other),
            };
            let (streamid, rmsg) = rmsg.into_streamid_and_msg();
            let begin_recvd = stream_state.lock().unwrap().begin_recvd;
            let end_recvd = stream_state.lock().unwrap().end_recvd;
            match rmsg {
                AnyRelayMsg::Begin(_) if begin_recvd => {
                    panic!("client tried to open two streams?!");
                }
                AnyRelayMsg::Begin(_) if !begin_recvd => {
                    stream_state.lock().unwrap().begin_recvd = true;
                    // Reply with a connected cell...
                    let connected = relaymsg::Connected::new_empty().into();
                    sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
                }
                AnyRelayMsg::End(_) if !end_recvd => {
                    stream_state.lock().unwrap().end_recvd = true;
                    break;
                }
                AnyRelayMsg::End(_) if end_recvd => {
                    panic!("received two END cells for the same stream?!");
                }
                AnyRelayMsg::ConfluxSwitch(_) => {
                    // Ensure we got the SWITCH after the expected number of cells
                    let cells_until_switch = expect_switch.remove(0);
                    assert_eq!(cells_until_switch, cell_count);
                    // To keep the tests simple, we don't handle out of order cells,
                    // and simply sort the received data at the end.
                    // This ensures all the data was actually received,
                    // but it doesn't actually test that the SWITCH cells
                    // contain the appropriate seqnos.
                    continue;
                }
                AnyRelayMsg::Data(dat) => {
                    data_cells_received += 1;
                    stream_state
                        .lock()
                        .unwrap()
                        .data_recvd
                        .extend_from_slice(dat.as_ref());
                }
                _ => panic!("unexpected message {rmsg:?} on leg {leg}"),
            }
            if data_cells_received == 100 {
                // Introduce an artificial delay, to make one circ have worse RTT
                // than the other, and thus trigger a SWITCH
                if let Some(rtt_delay) = rtt_delay {
                    runtime.advance_by(rtt_delay).await;
                }
                // Make and send a circuit-level SENDME
                let sendme =
                    relaymsg::Sendme::new_tag(hex!("2100000000000000000000000000000000000000"))
                        .into();
                sink.send(rmsg_to_ccmsg(None, sendme)).await.unwrap();
            }
        }
        ConfluxEndpointResult::Relay { rx, sink }
    }
    #[cfg(feature = "conflux")]
    async fn run_conflux_client(
        tunnel: Arc<ClientCirc>,
        conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
        stream_data: Vec<u8>,
    ) -> ConfluxEndpointResult {
        let res = conflux_link_rx.await;
        let res = res.unwrap().unwrap();
        assert_eq!(res.len(), 2);
        // All circuit legs have completed the conflux handshake,
        // so we now have a multipath tunnel
        // Now we're ready to open a stream
        let mut stream = tunnel
            .begin_stream("www.example.com", 443, None)
            .await
            .unwrap();
        stream.write_all(&stream_data).await.unwrap();
        stream.flush().await.unwrap();
        ConfluxEndpointResult::Circuit(tunnel)
    }
    #[cfg(feature = "conflux")]
    async fn run_conflux_endpoint(endpoint: ConfluxTestEndpoint) -> ConfluxEndpointResult {
        match endpoint {
            ConfluxTestEndpoint::Relay(state) => run_mock_conflux_exit(state).await,
            ConfluxTestEndpoint::Client {
                tunnel,
                conflux_link_rx,
                stream_data,
            } => run_conflux_client(tunnel, conflux_link_rx, stream_data).await,
        }
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn multipath_stream() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let TestTunnelCtx {
                tunnel,
                circs,
                conflux_link_rx,
            } = setup_good_conflux_tunnel(&rt).await;
            let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
            // The stream data we're going to send over the conflux tunnel
            let stream_data = (0..255_u8).cycle().take(300 * 498).collect::<Vec<_>>();
            let stream_state = Arc::new(Mutex::new(ConfluxStreamState {
                data_recvd: vec![],
                expected_data_len: stream_data.len(),
                begin_recvd: false,
                end_recvd: false,
            }));
            let mut tasks = vec![];
            // Note: we can't have two advance_by() calls running
            // at the same time (it's a limitation of MockRuntime),
            // so we need to be careful to not cause concurrent delays
            // on the two circuits.
            let relays = [
                (
                    circ1.chan_rx,
                    circ1.circ_sink,
                    // We expect the client to start sending on the leg with no initial RTT delay,
                    // and then switch to the one with the lower overall RTT
                    vec![2],
                    None,
                    Some(Duration::from_millis(300)),
                ),
                (
                    circ2.chan_rx,
                    circ2.circ_sink,
                    vec![1],
                    Some(Duration::from_millis(200)),
                    None,
                ),
            ];
            for (leg, (rx, sink, expect_switch, init_rtt_delay, rtt_delay)) in
                relays.into_iter().enumerate()
            {
                let relay = ConfluxTestEndpoint::Relay(ConfluxExitState {
                    runtime: rt.clone(),
                    leg,
                    init_rtt_delay,
                    rtt_delay,
                    rx,
                    sink,
                    stream_state: Arc::clone(&stream_state),
                    expect_switch,
                });
                tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
            }
            tasks.push(rt.spawn_join(
                "client task".to_string(),
                run_conflux_endpoint(ConfluxTestEndpoint::Client {
                    tunnel,
                    conflux_link_rx,
                    stream_data: stream_data.clone(),
                }),
            ));
            let _sinks = futures::future::join_all(tasks).await;
            let stream_state = stream_state.lock().unwrap();
            assert!(stream_state.begin_recvd);
            assert!(stream_state.end_recvd);
            // TODO: sort stream_data to work around the lack of handling of
            // out-of-order cells at the mock exit
            assert_eq!(stream_state.data_recvd, stream_data);
        });
    }
    #[traced_test]
    #[test]
    #[cfg(all(feature = "conflux", feature = "hs-service"))]
    fn conflux_incoming_stream() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            use std::error::Error as _;
            const EXPECTED_HOP: u8 = 1;
            let TestTunnelCtx {
                tunnel,
                circs,
                conflux_link_rx,
            } = setup_good_conflux_tunnel(&rt).await;
            let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
            let link = await_link_payload(&mut circ1.chan_rx).await;
            for circ in [&mut circ1, &mut circ2] {
                let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
                circ.circ_sink
                    .send(rmsg_to_ccmsg(None, linked))
                    .await
                    .unwrap();
            }
            let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
            assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
            // TODO(#2002): we don't currently support conflux for onion services
            let err = tunnel
                .allow_stream_requests(
                    &[tor_cell::relaycell::RelayCmd::BEGIN],
                    EXPECTED_HOP.into(),
                    AllowAllStreamsFilter,
                )
                .await
                // IncomingStream doesn't impl Debug, so we need to map to a different type
                .map(|_| ())
                .unwrap_err();
            let err_src = err.source().unwrap();
            assert!(err_src
                .to_string()
                .contains("Cannot allow stream requests on tunnel with 2 legs"));
        });
    }
}