1
//! Multi-hop paths over the Tor network.
2
//!
3
//! Right now, we only implement "client circuits" -- also sometimes
4
//! called "origin circuits".  A client circuit is one that is
5
//! constructed by this Tor instance, and used in its own behalf to
6
//! send data over the Tor network.
7
//!
8
//! Each circuit has multiple hops over the Tor network: each hop
9
//! knows only the hop before and the hop after.  The client shares a
10
//! separate set of keys with each hop.
11
//!
12
//! To build a circuit, first create a [crate::channel::Channel], then
13
//! call its [crate::channel::Channel::new_circ] method.  This yields
14
//! a [PendingClientCirc] object that won't become live until you call
15
//! one of the methods
16
//! (typically [`PendingClientCirc::create_firsthop`])
17
//! that extends it to its first hop.  After you've
18
//! done that, you can call [`ClientCirc::extend`] on the circuit to
19
//! build it into a multi-hop circuit.  Finally, you can use
20
//! [ClientCirc::begin_stream] to get a Stream object that can be used
21
//! for anonymized data.
22
//!
23
//! # Implementation
24
//!
25
//! Each open circuit has a corresponding Reactor object that runs in
26
//! an asynchronous task, and manages incoming cells from the
27
//! circuit's upstream channel.  These cells are either RELAY cells or
28
//! DESTROY cells.  DESTROY cells are handled immediately.
29
//! RELAY cells are either for a particular stream, in which case they
30
//! get forwarded to a RawCellStream object, or for no particular stream,
31
//! in which case they are considered "meta" cells (like EXTENDED2)
32
//! that should only get accepted if something is waiting for them.
33
//!
34
//! # Limitations
35
//!
36
//! This is client-only.
37

            
38
pub(crate) mod celltypes;
39
pub(crate) mod halfcirc;
40

            
41
#[cfg(feature = "hs-common")]
42
pub mod handshake;
43
#[cfg(not(feature = "hs-common"))]
44
pub(crate) mod handshake;
45

            
46
pub(super) mod path;
47
pub(crate) mod unique_id;
48

            
49
use crate::channel::Channel;
50
use crate::congestion::params::CongestionControlParams;
51
use crate::crypto::cell::HopNum;
52
use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
53
use crate::memquota::{CircuitAccount, SpecificAccount as _};
54
use crate::stream::queue::stream_queue;
55
use crate::stream::xon_xoff::XonXoffReaderCtrl;
56
use crate::stream::{
57
    AnyCmdChecker, DataCmdChecker, DataStream, ResolveCmdChecker, ResolveStream, StreamParameters,
58
    StreamRateLimit, StreamReceiver,
59
};
60
use crate::tunnel::circuit::celltypes::*;
61
use crate::tunnel::reactor::CtrlCmd;
62
use crate::tunnel::reactor::{
63
    CircuitHandshake, CtrlMsg, Reactor, RECV_WINDOW_INIT, STREAM_READER_BUFFER,
64
};
65
use crate::tunnel::{StreamTarget, TargetHop};
66
use crate::util::notify::NotifySender;
67
use crate::util::skew::ClockSkew;
68
use crate::{Error, ResolveError, Result};
69
use educe::Educe;
70
use path::HopDetail;
71
use postage::watch;
72
use tor_cell::{
73
    chancell::CircId,
74
    relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal},
75
};
76

            
77
use tor_error::{bad_api_usage, internal, into_internal};
78
use tor_linkspec::{CircTarget, LinkSpecType, OwnedChanTarget, RelayIdType};
79
use tor_protover::named;
80

            
81
pub use crate::crypto::binding::CircuitBinding;
82
pub use crate::memquota::StreamAccount;
83
pub use crate::tunnel::circuit::unique_id::UniqId;
84

            
85
#[cfg(feature = "hs-service")]
86
use {
87
    crate::stream::{IncomingCmdChecker, IncomingStream},
88
    crate::tunnel::reactor::StreamReqInfo,
89
};
90

            
91
use futures::channel::mpsc;
92
use oneshot_fused_workaround as oneshot;
93

            
94
use crate::congestion::sendme::StreamRecvWindow;
95
use crate::DynTimeProvider;
96
use futures::FutureExt as _;
97
use std::collections::HashMap;
98
use std::net::IpAddr;
99
use std::sync::{Arc, Mutex};
100
use tor_memquota::mq_queue::{self, ChannelSpec as _, MpscSpec};
101

            
102
use crate::crypto::handshake::ntor::NtorPublicKey;
103

            
104
pub use path::{Path, PathEntry};
105

            
106
/// The size of the buffer for communication between `ClientCirc` and its reactor.
107
pub const CIRCUIT_BUFFER_SIZE: usize = 128;
108

            
109
#[cfg(feature = "send-control-msg")]
110
use {crate::tunnel::msghandler::UserMsgHandler, crate::tunnel::reactor::MetaCellHandler};
111

            
112
pub use crate::tunnel::reactor::syncview::ClientCircSyncView;
113
#[cfg(feature = "send-control-msg")]
114
#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
115
pub use {crate::tunnel::msghandler::MsgHandler, crate::tunnel::reactor::MetaCellDisposition};
116

            
117
/// MPSC queue relating to a stream (either inbound or outbound), sender
118
pub(crate) type StreamMpscSender<T> = mq_queue::Sender<T, MpscSpec>;
119
/// MPSC queue relating to a stream (either inbound or outbound), receiver
120
pub(crate) type StreamMpscReceiver<T> = mq_queue::Receiver<T, MpscSpec>;
121

            
122
/// MPSC queue for inbound data on its way from channel to circuit, sender
123
pub(crate) type CircuitRxSender = mq_queue::Sender<ClientCircChanMsg, MpscSpec>;
124
/// MPSC queue for inbound data on its way from channel to circuit, receiver
125
pub(crate) type CircuitRxReceiver = mq_queue::Receiver<ClientCircChanMsg, MpscSpec>;
126

            
127
#[derive(Debug)]
128
/// A circuit that we have constructed over the Tor network.
129
///
130
/// # Circuit life cycle
131
///
132
/// `ClientCirc`s are created in an initially unusable state using [`Channel::new_circ`],
133
/// which returns a [`PendingClientCirc`].  To get a real (one-hop) circuit from
134
/// one of these, you invoke one of its `create_firsthop` methods (typically
135
/// [`create_firsthop_fast()`](PendingClientCirc::create_firsthop_fast) or
136
/// [`create_firsthop()`](PendingClientCirc::create_firsthop)).
137
/// Then, to add more hops to the circuit, you can call
138
/// [`extend()`](ClientCirc::extend) on it.
139
///
140
/// For higher-level APIs, see the `tor-circmgr` crate: the ones here in
141
/// `tor-proto` are probably not what you need.
142
///
143
/// After a circuit is created, it will persist until it is closed in one of
144
/// five ways:
145
///    1. A remote error occurs.
146
///    2. Some hop on the circuit sends a `DESTROY` message to tear down the
147
///       circuit.
148
///    3. The circuit's channel is closed.
149
///    4. Someone calls [`ClientCirc::terminate`] on the circuit.
150
///    5. The last reference to the `ClientCirc` is dropped. (Note that every stream
151
///       on a `ClientCirc` keeps a reference to it, which will in turn keep the
152
///       circuit from closing until all those streams have gone away.)
153
///
154
/// Note that in cases 1-4 the [`ClientCirc`] object itself will still exist: it
155
/// will just be unusable for most purposes.  Most operations on it will fail
156
/// with an error.
157
//
158
// Effectively, this struct contains two Arcs: one for `path` and one for
159
// `control` (which surely has something Arc-like in it).  We cannot unify
160
// these by putting a single Arc around the whole struct, and passing
161
// an Arc strong reference to the `Reactor`, because then `control` would
162
// not be dropped when the last user of the circuit goes away.  We could
163
// make the reactor have a weak reference but weak references are more
164
// expensive to dereference.
165
//
166
// Because of the above, cloning this struct is always going to involve
167
// two atomic refcount changes/checks.  Wrapping it in another Arc would
168
// be overkill.
169
//
170
pub struct ClientCirc {
171
    /// Mutable state shared with the `Reactor`.
172
    mutable: Arc<TunnelMutableState>,
173
    /// A unique identifier for this circuit.
174
    unique_id: UniqId,
175
    /// Channel to send control messages to the reactor.
176
    pub(super) control: mpsc::UnboundedSender<CtrlMsg>,
177
    /// Channel to send commands to the reactor.
178
    pub(super) command: mpsc::UnboundedSender<CtrlCmd>,
179
    /// A future that resolves to Cancelled once the reactor is shut down,
180
    /// meaning that the circuit is closed.
181
    #[cfg_attr(not(feature = "experimental-api"), allow(dead_code))]
182
    reactor_closed_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
183
    /// For testing purposes: the CircId, for use in peek_circid().
184
    #[cfg(test)]
185
    circid: CircId,
186
    /// Memory quota account
187
    memquota: CircuitAccount,
188
    /// Time provider
189
    time_provider: DynTimeProvider,
190
}
191

            
192
/// The mutable state of a tunnel, shared between [`ClientCirc`] and [`Reactor`].
193
///
194
/// NOTE(gabi): this mutex-inside-a-mutex might look suspicious,
195
/// but it is currently the best option we have for sharing
196
/// the circuit state with `ClientCirc` (and soon, with `ClientTunnel`).
197
/// In practice, these mutexes won't be accessed very often
198
/// (they're accessed for writing when a circuit is extended,
199
/// and for reading by the various `ClientCirc` APIs),
200
/// so they shouldn't really impact performance.
201
///
202
/// Alternatively, the circuit state information could be shared
203
/// outside the reactor through a channel (passed to the reactor via a `CtrlCmd`),
204
/// but in #1840 @opara notes that involves making the `ClientCirc` accessors
205
/// (`ClientCirc::path`, `ClientCirc::binding_key`, etc.)
206
/// asynchronous, which will significantly complicate their callsites,
207
/// which would in turn need to be made async too.
208
///
209
/// We should revisit this decision at some point, and decide whether an async API
210
/// would be preferable.
211
#[derive(Debug, Default)]
212
pub(super) struct TunnelMutableState(Mutex<HashMap<UniqId, Arc<MutableState>>>);
213

            
214
impl TunnelMutableState {
215
    /// Add the [`MutableState`] of a circuit.
216
312
    pub(super) fn insert(&self, unique_id: UniqId, mutable: Arc<MutableState>) {
217
312
        #[allow(unused)] // unused in non-debug builds
218
312
        let state = self
219
312
            .0
220
312
            .lock()
221
312
            .expect("lock poisoned")
222
312
            .insert(unique_id, mutable);
223
312

            
224
312
        debug_assert!(state.is_none());
225
312
    }
226

            
227
    /// Remove the [`MutableState`] of a circuit.
228
28
    pub(super) fn remove(&self, unique_id: UniqId) {
229
28
        #[allow(unused)] // unused in non-debug builds
230
28
        let state = self.0.lock().expect("lock poisoned").remove(&unique_id);
231
28

            
232
28
        debug_assert!(state.is_some());
233
28
    }
234

            
235
    /// Return a [`Path`] object describing all the hops in the specified circuit.
236
    ///
237
    /// See [`MutableState::path`].
238
32
    fn path_ref(&self, unique_id: UniqId) -> Result<Arc<Path>> {
239
32
        let lock = self.0.lock().expect("lock poisoned");
240
32
        let mutable = lock
241
32
            .get(&unique_id)
242
32
            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
243

            
244
32
        Ok(mutable.path())
245
32
    }
246

            
247
    /// Return a description of the first hop of this circuit.
248
    ///
249
    /// Returns an error if a circuit with the specified [`UniqId`] doesn't exist.
250
    /// Returns `Ok(None)` if the specified circuit doesn't have any hops.
251
    fn first_hop(&self, unique_id: UniqId) -> Result<Option<OwnedChanTarget>> {
252
        let lock = self.0.lock().expect("lock poisoned");
253
        let mutable = lock
254
            .get(&unique_id)
255
            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
256

            
257
        let first_hop = mutable.first_hop().map(|first_hop| match first_hop {
258
            path::HopDetail::Relay(r) => r,
259
            #[cfg(feature = "hs-common")]
260
            path::HopDetail::Virtual => {
261
                panic!("somehow made a circuit with a virtual first hop.")
262
            }
263
        });
264

            
265
        Ok(first_hop)
266
    }
267

            
268
    /// Return the [`HopNum`] of the last hop of the specified circuit.
269
    ///
270
    /// Returns an error if a circuit with the specified [`UniqId`] doesn't exist.
271
    ///
272
    /// See [`MutableState::last_hop_num`].
273
32
    fn last_hop_num(&self, unique_id: UniqId) -> Result<Option<HopNum>> {
274
32
        let lock = self.0.lock().expect("lock poisoned");
275
32
        let mutable = lock
276
32
            .get(&unique_id)
277
32
            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
278

            
279
32
        Ok(mutable.last_hop_num())
280
32
    }
281

            
282
    /// Return the number of hops in the specified circuit.
283
    ///
284
    /// See [`MutableState::n_hops`].
285
80
    fn n_hops(&self, unique_id: UniqId) -> Result<usize> {
286
80
        let lock = self.0.lock().expect("lock poisoned");
287
80
        let mutable = lock
288
80
            .get(&unique_id)
289
80
            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
290

            
291
80
        Ok(mutable.n_hops())
292
80
    }
293

            
294
    /// Return the number of legs in this tunnel.
295
    ///
296
    /// TODO(conflux-fork): this can be removed once we modify `path_ref`
297
    /// to return *all* the Paths in the tunnel.
298
44
    fn n_legs(&self) -> usize {
299
44
        let lock = self.0.lock().expect("lock poisoned");
300
44
        lock.len()
301
44
    }
302
}
303

            
304
/// The mutable state of a circuit.
305
32
#[derive(Educe, Default)]
306
#[educe(Debug)]
307
pub(super) struct MutableState(Mutex<CircuitState>);
308

            
309
impl MutableState {
310
    /// Add a hop to the path of this circuit.
311
744
    pub(super) fn add_hop(&self, peer_id: HopDetail, binding: Option<CircuitBinding>) {
312
744
        let mut mutable = self.0.lock().expect("poisoned lock");
313
744
        Arc::make_mut(&mut mutable.path).push_hop(peer_id);
314
744
        mutable.binding.push(binding);
315
744
    }
316

            
317
    /// Get a copy of the circuit's current [`path::Path`].
318
128
    pub(super) fn path(&self) -> Arc<path::Path> {
319
128
        let mutable = self.0.lock().expect("poisoned lock");
320
128
        Arc::clone(&mutable.path)
321
128
    }
322

            
323
    /// Return the cryptographic material used to prove knowledge of a shared
324
    /// secret with with `hop`.
325
    pub(super) fn binding_key(&self, hop: HopNum) -> Option<CircuitBinding> {
326
        let mutable = self.0.lock().expect("poisoned lock");
327

            
328
        mutable.binding.get::<usize>(hop.into()).cloned().flatten()
329
        // NOTE: I'm not thrilled to have to copy this information, but we use
330
        // it very rarely, so it's not _that_ bad IMO.
331
    }
332

            
333
    /// Return a description of the first hop of this circuit.
334
    fn first_hop(&self) -> Option<HopDetail> {
335
        let mutable = self.0.lock().expect("poisoned lock");
336
        mutable.path.first_hop()
337
    }
338

            
339
    /// Return the [`HopNum`] of the last hop of this circuit.
340
    ///
341
    /// NOTE: This function will return the [`HopNum`] of the hop
342
    /// that is _currently_ the last. If there is an extend operation in progress,
343
    /// the currently pending hop may or may not be counted, depending on whether
344
    /// the extend operation finishes before this call is done.
345
32
    fn last_hop_num(&self) -> Option<HopNum> {
346
32
        let mutable = self.0.lock().expect("poisoned lock");
347
32
        mutable.path.last_hop_num()
348
32
    }
349

            
350
    /// Return the number of hops in this circuit.
351
    ///
352
    /// NOTE: This function will currently return only the number of hops
353
    /// _currently_ in the circuit. If there is an extend operation in progress,
354
    /// the currently pending hop may or may not be counted, depending on whether
355
    /// the extend operation finishes before this call is done.
356
80
    fn n_hops(&self) -> usize {
357
80
        let mutable = self.0.lock().expect("poisoned lock");
358
80
        mutable.path.n_hops()
359
80
    }
360
}
361

            
362
/// The shared state of a circuit.
363
32
#[derive(Educe, Default)]
364
#[educe(Debug)]
365
pub(super) struct CircuitState {
366
    /// Information about this circuit's path.
367
    ///
368
    /// This is stored in an Arc so that we can cheaply give a copy of it to
369
    /// client code; when we need to add a hop (which is less frequent) we use
370
    /// [`Arc::make_mut()`].
371
    path: Arc<path::Path>,
372

            
373
    /// Circuit binding keys [q.v.][`CircuitBinding`] information for each hop
374
    /// in the circuit's path.
375
    ///
376
    /// NOTE: Right now, there is a `CircuitBinding` for every hop.  There's a
377
    /// fair chance that this will change in the future, and I don't want other
378
    /// code to assume that a `CircuitBinding` _must_ exist, so I'm making this
379
    /// an `Option`.
380
    #[educe(Debug(ignore))]
381
    binding: Vec<Option<CircuitBinding>>,
382
}
383

            
384
/// A ClientCirc that needs to send a create cell and receive a created* cell.
385
///
386
/// To use one of these, call `create_firsthop_fast()` or `create_firsthop()`
387
/// to negotiate the cryptographic handshake with the first hop.
388
pub struct PendingClientCirc {
389
    /// A oneshot receiver on which we'll receive a CREATED* cell,
390
    /// or a DESTROY cell.
391
    recvcreated: oneshot::Receiver<CreateResponse>,
392
    /// The ClientCirc object that we can expose on success.
393
    circ: Arc<ClientCirc>,
394
}
395

            
396
/// Description of the network's current rules for building circuits.
397
///
398
/// This type describes rules derived from the consensus,
399
/// and possibly amended by our own configuration.
400
///
401
/// Typically, this type created once for an entire circuit,
402
/// and any special per-hop information is derived
403
/// from each hop as a CircTarget.
404
/// Note however that callers _may_ provide different `CircParameters`
405
/// for different hops within a circuit if they have some reason to do so,
406
/// so we do not enforce that every hop in a circuit has the same `CircParameters`.
407
#[non_exhaustive]
408
#[derive(Clone, Debug)]
409
pub struct CircParameters {
410
    /// Whether we should include ed25519 identities when we send
411
    /// EXTEND2 cells.
412
    pub extend_by_ed25519_id: bool,
413
    /// Congestion control parameters for this circuit.
414
    pub ccontrol: CongestionControlParams,
415

            
416
    /// Maximum number of permitted incoming relay cells for each hop.
417
    ///
418
    /// If we would receive more relay cells than this from a single hop,
419
    /// we close the circuit with [`ExcessInboundCells`](Error::ExcessInboundCells).
420
    ///
421
    /// If this value is None, then there is no limit to the number of inbound cells.
422
    ///
423
    /// Known limitation: If this value if `u32::MAX`,
424
    /// then a limit of `u32::MAX - 1` is enforced.
425
    pub n_incoming_cells_permitted: Option<u32>,
426

            
427
    /// Maximum number of permitted outgoing relay cells for each hop.
428
    ///
429
    /// If we would try to send more relay cells than this from a single hop,
430
    /// we close the circuit with [`ExcessOutboundCells`](Error::ExcessOutboundCells).
431
    /// It is the circuit-user's responsibility to make sure that this does not happen.
432
    ///
433
    /// This setting is used to ensure that we do not violate a limit
434
    /// imposed by `n_incoming_cells_permitted`
435
    /// on the other side of a circuit.
436
    ///
437
    /// If this value is None, then there is no limit to the number of outbound cells.
438
    ///
439
    /// Known limitation: If this value if `u32::MAX`,
440
    /// then a limit of `u32::MAX - 1` is enforced.
441
    pub n_outgoing_cells_permitted: Option<u32>,
442
}
443

            
444
/// The settings we use for single hop of a circuit.
445
///
446
/// Unlike [`CircParameters`], this type is crate-internal.
447
/// We construct it based on our settings from the circuit,
448
/// and from the hop's actual capabilities.
449
/// Then, we negotiate with the hop as part of circuit
450
/// creation/extension to determine the actual settings that will be in use.
451
/// Finally, we use those settings to construct the negotiated circuit hop.
452
//
453
// TODO: Relays should probably derive an instance of this type too, as
454
// part of the circuit creation handshake.
455
#[derive(Clone, Debug)]
456
pub(super) struct HopSettings {
457
    /// The negotiated congestion control settings for this circuit.
458
    pub(super) ccontrol: CongestionControlParams,
459

            
460
    /// Maximum number of permitted incoming relay cells for this hop.
461
    pub(super) n_incoming_cells_permitted: Option<u32>,
462

            
463
    /// Maximum number of permitted outgoing relay cells for this hop.
464
    pub(super) n_outgoing_cells_permitted: Option<u32>,
465
}
466

            
467
impl HopSettings {
468
    /// Construct a new `HopSettings` based on `params` (a set of circuit parameters)
469
    /// and `caps` (a set of protocol capabilities for a circuit target).
470
    ///
471
    /// The resulting settings will represent what the client would prefer to negotiate
472
    /// (determined by `params`),
473
    /// as modified by what the target relay is believed to support (represented by `caps`).
474
    ///
475
    /// This represents the `HopSettings` in a pre-negotiation state:
476
    /// the circuit negotiation process will modify it.
477
    #[allow(clippy::unnecessary_wraps)] // likely to become fallible in the future.
478
776
    pub(super) fn from_params_and_caps(
479
776
        params: &CircParameters,
480
776
        caps: &tor_protover::Protocols,
481
776
    ) -> Result<Self> {
482
776
        let mut settings = Self {
483
776
            ccontrol: params.ccontrol.clone(),
484
776
            n_incoming_cells_permitted: params.n_incoming_cells_permitted,
485
776
            n_outgoing_cells_permitted: params.n_outgoing_cells_permitted,
486
776
        };
487
776

            
488
776
        match settings.ccontrol.alg() {
489
504
            crate::ccparams::Algorithm::FixedWindow(_) => {}
490
            crate::ccparams::Algorithm::Vegas(_) => {
491
                // If the target doesn't support FLOWCTRL_CC, we can't use Vegas.
492
272
                if !caps.supports_named_subver(named::FLOWCTRL_CC) {
493
                    settings.ccontrol.use_fallback_alg();
494
272
                }
495
            }
496
        }
497

            
498
776
        Ok(settings)
499
776
    }
500

            
501
    /// Return a new `HopSettings` based on this one,
502
    /// representing the settings that we should use
503
    /// if circuit negotiation will be impossible.
504
    ///
505
    /// (Circuit negotiation is impossible when using the legacy ntor protocol,
506
    /// and when using CRATE_FAST.  It is currently unsupported with virtual hops.)
507
56
    pub(super) fn without_negotiation(mut self) -> Self {
508
56
        self.ccontrol.use_fallback_alg();
509
56
        self
510
56
    }
511
}
512

            
513
#[cfg(test)]
514
impl std::default::Default for CircParameters {
515
222
    fn default() -> Self {
516
222
        Self {
517
222
            extend_by_ed25519_id: true,
518
222
            ccontrol: crate::congestion::test_utils::params::build_cc_fixed_params(),
519
222
            n_incoming_cells_permitted: None,
520
222
            n_outgoing_cells_permitted: None,
521
222
        }
522
222
    }
523
}
524

            
525
impl CircParameters {
526
    /// Constructor
527
569
    pub fn new(extend_by_ed25519_id: bool, ccontrol: CongestionControlParams) -> Self {
528
569
        Self {
529
569
            extend_by_ed25519_id,
530
569
            ccontrol,
531
569
            n_incoming_cells_permitted: None,
532
569
            n_outgoing_cells_permitted: None,
533
569
        }
534
569
    }
535
}
536

            
537
impl ClientCirc {
538
    /// Return a description of the first hop of this circuit.
539
    ///
540
    /// # Panics
541
    ///
542
    /// Panics if there is no first hop.  (This should be impossible outside of
543
    /// the tor-proto crate, but within the crate it's possible to have a
544
    /// circuit with no hops.)
545
    pub fn first_hop(&self) -> Result<OwnedChanTarget> {
546
        Ok(self
547
            .mutable
548
            .first_hop(self.unique_id)
549
            .map_err(|_| Error::CircuitClosed)?
550
            .expect("called first_hop on an un-constructed circuit"))
551
    }
552

            
553
    /// Return a description of the last hop of the circuit.
554
    ///
555
    /// Return None if the last hop is virtual.
556
    ///
557
    /// See caveats on [`ClientCirc::last_hop_num()`].
558
    ///
559
    /// # Panics
560
    ///
561
    /// Panics if there is no last hop.  (This should be impossible outside of
562
    /// the tor-proto crate, but within the crate it's possible to have a
563
    /// circuit with no hops.)
564
    pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
565
        let path = self.path_ref()?;
566
        Ok(path
567
            .hops()
568
            .last()
569
            .expect("Called last_hop an an un-constructed circuit")
570
            .as_chan_target()
571
            .map(OwnedChanTarget::from_chan_target))
572
    }
573

            
574
    /// Return the [`HopNum`] of the last hop of this circuit.
575
    ///
576
    /// Returns an error if there is no last hop.  (This should be impossible outside of the
577
    /// tor-proto crate, but within the crate it's possible to have a circuit with no hops.)
578
    ///
579
    /// NOTE: This function will return the [`HopNum`] of the hop
580
    /// that is _currently_ the last. If there is an extend operation in progress,
581
    /// the currently pending hop may or may not be counted, depending on whether
582
    /// the extend operation finishes before this call is done.
583
    pub fn last_hop_num(&self) -> Result<HopNum> {
584
        Ok(self
585
            .mutable
586
            .last_hop_num(self.unique_id)?
587
            .ok_or_else(|| internal!("no last hop index"))?)
588
    }
589

            
590
    /// Return a [`TargetHop`] representing precisely the last hop of the circuit as in set as a
591
    /// HopLocation with its id and hop number.
592
    ///
593
    /// Return an error if there is no last hop.
594
32
    pub fn last_hop(&self) -> Result<TargetHop> {
595
32
        let hop_num = self
596
32
            .mutable
597
32
            .last_hop_num(self.unique_id)?
598
32
            .ok_or_else(|| bad_api_usage!("no last hop"))?;
599
32
        Ok((self.unique_id, hop_num).into())
600
32
    }
601

            
602
    /// Return a [`Path`] object describing all the hops in this circuit.
603
    ///
604
    /// Note that this `Path` is not automatically updated if the circuit is
605
    /// extended.
606
32
    pub fn path_ref(&self) -> Result<Arc<Path>> {
607
32
        self.mutable
608
32
            .path_ref(self.unique_id)
609
32
            .map_err(|_| Error::CircuitClosed)
610
32
    }
611

            
612
    /// Get the clock skew claimed by the first hop of the circuit.
613
    ///
614
    /// See [`Channel::clock_skew()`].
615
    pub async fn first_hop_clock_skew(&self) -> Result<ClockSkew> {
616
        let (tx, rx) = oneshot::channel();
617

            
618
        self.control
619
            .unbounded_send(CtrlMsg::FirstHopClockSkew { answer: tx })
620
            .map_err(|_| Error::CircuitClosed)?;
621

            
622
        Ok(rx.await.map_err(|_| Error::CircuitClosed)??)
623
    }
624

            
625
    /// Return a reference to this circuit's memory quota account
626
68
    pub fn mq_account(&self) -> &CircuitAccount {
627
68
        &self.memquota
628
68
    }
629

            
630
    /// Return the cryptographic material used to prove knowledge of a shared
631
    /// secret with with `hop`.
632
    ///
633
    /// See [`CircuitBinding`] for more information on how this is used.
634
    ///
635
    /// Return None if we have no circuit binding information for the hop, or if
636
    /// the hop does not exist.
637
    #[cfg(feature = "hs-service")]
638
    pub async fn binding_key(&self, hop: TargetHop) -> Result<Option<CircuitBinding>> {
639
        let (sender, receiver) = oneshot::channel();
640
        let msg = CtrlCmd::GetBindingKey { hop, done: sender };
641
        self.command
642
            .unbounded_send(msg)
643
            .map_err(|_| Error::CircuitClosed)?;
644

            
645
        receiver.await.map_err(|_| Error::CircuitClosed)?
646
    }
647

            
648
    /// Start an ad-hoc protocol exchange to the specified hop on this circuit
649
    ///
650
    /// To use this:
651
    ///
652
    ///  0. Create an inter-task channel you'll use to receive
653
    ///     the outcome of your conversation,
654
    ///     and bundle it into a [`MsgHandler`].
655
    ///
656
    ///  1. Call `start_conversation`.
657
    ///     This will install a your handler, for incoming messages,
658
    ///     and send the outgoing message (if you provided one).
659
    ///     After that, each message on the circuit
660
    ///     that isn't handled by the core machinery
661
    ///     is passed to your provided `reply_handler`.
662
    ///
663
    ///  2. Possibly call `send_msg` on the [`Conversation`],
664
    ///     from the call site of `start_conversation`,
665
    ///     possibly multiple times, from time to time,
666
    ///     to send further desired messages to the peer.
667
    ///
668
    ///  3. In your [`MsgHandler`], process the incoming messages.
669
    ///     You may respond by
670
    ///     sending additional messages
671
    ///     When the protocol exchange is finished,
672
    ///     `MsgHandler::handle_msg` should return
673
    ///     [`ConversationFinished`](MetaCellDisposition::ConversationFinished).
674
    ///
675
    /// If you don't need the `Conversation` to send followup messages,
676
    /// you may simply drop it,
677
    /// and rely on the responses you get from your handler,
678
    /// on the channel from step 0 above.
679
    /// Your handler will remain installed and able to process incoming messages
680
    /// until it returns `ConversationFinished`.
681
    ///
682
    /// (If you don't want to accept any replies at all, it may be
683
    /// simpler to use [`ClientCirc::send_raw_msg`].)
684
    ///
685
    /// Note that it is quite possible to use this function to violate the tor
686
    /// protocol; most users of this API will not need to call it.  It is used
687
    /// to implement most of the onion service handshake.
688
    ///
689
    /// # Limitations
690
    ///
691
    /// Only one conversation may be active at any one time,
692
    /// for any one circuit.
693
    /// This generally means that this function should not be called
694
    /// on a circuit which might be shared with anyone else.
695
    ///
696
    /// Likewise, it is forbidden to try to extend the circuit,
697
    /// while the conversation is in progress.
698
    ///
699
    /// After the conversation has finished, the circuit may be extended.
700
    /// Or, `start_conversation` may be called again;
701
    /// but, in that case there will be a gap between the two conversations,
702
    /// during which no `MsgHandler` is installed,
703
    /// and unexpected incoming messages would close the circuit.
704
    ///
705
    /// If these restrictions are violated, the circuit will be closed with an error.
706
    ///
707
    /// ## Precise definition of the lifetime of a conversation
708
    ///
709
    /// A conversation is in progress from entry to `start_conversation`,
710
    /// until entry to the body of the [`MsgHandler::handle_msg`]
711
    /// call which returns [`ConversationFinished`](MetaCellDisposition::ConversationFinished).
712
    /// (*Entry* since `handle_msg` is synchronously embedded
713
    /// into the incoming message processing.)
714
    /// So you may start a new conversation as soon as you have the final response
715
    /// via your inter-task channel from (0) above.
716
    ///
717
    /// The lifetime relationship of the [`Conversation`],
718
    /// vs the handler returning `ConversationFinished`
719
    /// is not enforced by the type system.
720
    // Doing so without still leaving plenty of scope for runtime errors doesn't seem possible,
721
    // at least while allowing sending followup messages from outside the handler.
722
    //
723
    // TODO hs: it might be nice to avoid exposing tor-cell APIs in the
724
    //   tor-proto interface.
725
    #[cfg(feature = "send-control-msg")]
726
    pub async fn start_conversation(
727
        &self,
728
        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
729
        reply_handler: impl MsgHandler + Send + 'static,
730
        hop: TargetHop,
731
    ) -> Result<Conversation<'_>> {
732
        // We need to resolve the TargetHop into a precise HopLocation so our msg handler can match
733
        // the right Leg/Hop with inbound cell.
734
        let (sender, receiver) = oneshot::channel();
735
        self.command
736
            .unbounded_send(CtrlCmd::ResolveTargetHop { hop, done: sender })
737
            .map_err(|_| Error::CircuitClosed)?;
738
        let hop_location = receiver.await.map_err(|_| Error::CircuitClosed)??;
739
        let handler = Box::new(UserMsgHandler::new(hop_location, reply_handler));
740
        let conversation = Conversation(self);
741
        conversation.send_internal(msg, Some(handler)).await?;
742
        Ok(conversation)
743
    }
744

            
745
    /// Send an ad-hoc message to a given hop on the circuit, without expecting
746
    /// a reply.
747
    ///
748
    /// (If you want to handle one or more possible replies, see
749
    /// [`ClientCirc::start_conversation`].)
750
    #[cfg(feature = "send-control-msg")]
751
    pub async fn send_raw_msg(
752
        &self,
753
        msg: tor_cell::relaycell::msg::AnyRelayMsg,
754
        hop: TargetHop,
755
    ) -> Result<()> {
756
        let (sender, receiver) = oneshot::channel();
757
        let ctrl_msg = CtrlMsg::SendMsg { hop, msg, sender };
758
        self.control
759
            .unbounded_send(ctrl_msg)
760
            .map_err(|_| Error::CircuitClosed)?;
761

            
762
        receiver.await.map_err(|_| Error::CircuitClosed)?
763
    }
764

            
765
    /// Tell this circuit to begin allowing the final hop of the circuit to try
766
    /// to create new Tor streams, and to return those pending requests in an
767
    /// asynchronous stream.
768
    ///
769
    /// Ordinarily, these requests are rejected.
770
    ///
771
    /// There can only be one [`Stream`](futures::Stream) of this type created on a given circuit.
772
    /// If a such a [`Stream`](futures::Stream) already exists, this method will return
773
    /// an error.
774
    ///
775
    /// After this method has been called on a circuit, the circuit is expected
776
    /// to receive requests of this type indefinitely, until it is finally closed.
777
    /// If the `Stream` is dropped, the next request on this circuit will cause it to close.
778
    ///
779
    /// Only onion services (and eventually) exit relays should call this
780
    /// method.
781
    //
782
    // TODO: Someday, we might want to allow a stream request handler to be
783
    // un-registered.  However, nothing in the Tor protocol requires it.
784
    #[cfg(feature = "hs-service")]
785
44
    pub async fn allow_stream_requests(
786
44
        self: &Arc<ClientCirc>,
787
44
        allow_commands: &[tor_cell::relaycell::RelayCmd],
788
44
        hop: TargetHop,
789
44
        filter: impl crate::stream::IncomingStreamRequestFilter,
790
44
    ) -> Result<impl futures::Stream<Item = IncomingStream>> {
791
        use futures::stream::StreamExt;
792

            
793
        /// The size of the channel receiving IncomingStreamRequestContexts.
794
        const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
795

            
796
        // TODO(#2002): support onion service conflux
797
44
        let circ_count = self.mutable.n_legs();
798
44
        if circ_count != 1 {
799
4
            return Err(
800
4
                internal!("Cannot allow stream requests on tunnel with {circ_count} legs",).into(),
801
4
            );
802
40
        }
803
40

            
804
40
        let time_prov = self.time_provider.clone();
805
40
        let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
806
40
        let (incoming_sender, incoming_receiver) =
807
40
            MpscSpec::new(INCOMING_BUFFER).new_mq(time_prov, self.memquota.as_raw_account())?;
808
40
        let (tx, rx) = oneshot::channel();
809
40

            
810
40
        self.command
811
40
            .unbounded_send(CtrlCmd::AwaitStreamRequest {
812
40
                cmd_checker,
813
40
                incoming_sender,
814
40
                hop,
815
40
                done: tx,
816
40
                filter: Box::new(filter),
817
40
            })
818
40
            .map_err(|_| Error::CircuitClosed)?;
819

            
820
        // Check whether the AwaitStreamRequest was processed successfully.
821
40
        rx.await.map_err(|_| Error::CircuitClosed)??;
822

            
823
32
        let allowed_hop_loc = match hop {
824
32
            TargetHop::Hop(loc) => Some(loc),
825
            _ => None,
826
        }
827
32
        .ok_or_else(|| bad_api_usage!("Expected TargetHop with HopLocation"))?;
828

            
829
32
        let circ = Arc::clone(self);
830
32
        Ok(incoming_receiver.map(move |req_ctx| {
831
24
            let StreamReqInfo {
832
24
                req,
833
24
                stream_id,
834
24
                hop,
835
24
                receiver,
836
24
                msg_tx,
837
24
                rate_limit_stream,
838
24
                drain_rate_request_stream,
839
24
                memquota,
840
24
                relay_cell_format,
841
24
            } = req_ctx;
842
24

            
843
24
            // We already enforce this in handle_incoming_stream_request; this
844
24
            // assertion is just here to make sure that we don't ever
845
24
            // accidentally remove or fail to enforce that check, since it is
846
24
            // security-critical.
847
24
            assert_eq!(allowed_hop_loc, hop);
848

            
849
            // TODO(#2002): figure out what this is going to look like
850
            // for onion services (perhaps we should forbid this function
851
            // from being called on a multipath circuit?)
852
            //
853
            // See also:
854
            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3002#note_3200937
855
24
            let target = StreamTarget {
856
24
                circ: Arc::clone(&circ),
857
24
                tx: msg_tx,
858
24
                hop: allowed_hop_loc,
859
24
                stream_id,
860
24
                relay_cell_format,
861
24
                rate_limit_stream,
862
24
            };
863
24

            
864
24
            // can be used to build a reader that supports XON/XOFF flow control
865
24
            let xon_xoff_reader_ctrl =
866
24
                XonXoffReaderCtrl::new(drain_rate_request_stream, target.clone());
867
24

            
868
24
            let reader = StreamReceiver {
869
24
                target: target.clone(),
870
24
                receiver,
871
24
                recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
872
24
                ended: false,
873
24
            };
874
24

            
875
24
            let components = StreamComponents {
876
24
                stream_receiver: reader,
877
24
                target,
878
24
                memquota,
879
24
                xon_xoff_reader_ctrl,
880
24
            };
881
24

            
882
24
            IncomingStream::new(circ.time_provider.clone(), req, components)
883
32
        }))
884
44
    }
885

            
886
    /// Extend the circuit, via the most appropriate circuit extension handshake,
887
    /// to the chosen `target` hop.
888
    pub async fn extend<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
889
    where
890
        Tg: CircTarget,
891
    {
892
        // For now we use the simplest decision-making mechanism:
893
        // we use ntor_v3 whenever it is present; and otherwise we use ntor.
894
        //
895
        // This behavior is slightly different from C tor, which uses ntor v3
896
        // only whenever it want to send any extension in the circuit message.
897
        // But thanks to congestion control (named::FLOWCTRL_CC), we'll _always_
898
        // want to use an extension if we can, and so it doesn't make too much
899
        // sense to detect the case where we have no extensions.
900
        //
901
        // (As of April 2025, RELAY_NTORV3 is not yet listed as Required for relays
902
        // on the tor network, and so we cannot simply assume that everybody has it.)
903
        if target
904
            .protovers()
905
            .supports_named_subver(named::RELAY_NTORV3)
906
        {
907
            self.extend_ntor_v3(target, params).await
908
        } else {
909
            self.extend_ntor(target, params).await
910
        }
911
    }
912

            
913
    /// Extend the circuit via the ntor handshake to a new target last
914
    /// hop.
915
40
    pub async fn extend_ntor<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
916
40
    where
917
40
        Tg: CircTarget,
918
40
    {
919
40
        let key = NtorPublicKey {
920
40
            id: *target
921
40
                .rsa_identity()
922
40
                .ok_or(Error::MissingId(RelayIdType::Rsa))?,
923
40
            pk: *target.ntor_onion_key(),
924
        };
925
40
        let mut linkspecs = target
926
40
            .linkspecs()
927
40
            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
928
40
        if !params.extend_by_ed25519_id {
929
            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
930
40
        }
931

            
932
40
        let (tx, rx) = oneshot::channel();
933
40

            
934
40
        let peer_id = OwnedChanTarget::from_chan_target(target);
935
40
        let settings =
936
40
            HopSettings::from_params_and_caps(&params, target.protovers())?.without_negotiation();
937
40
        self.control
938
40
            .unbounded_send(CtrlMsg::ExtendNtor {
939
40
                peer_id,
940
40
                public_key: key,
941
40
                linkspecs,
942
40
                settings,
943
40
                done: tx,
944
40
            })
945
40
            .map_err(|_| Error::CircuitClosed)?;
946

            
947
40
        rx.await.map_err(|_| Error::CircuitClosed)??;
948

            
949
8
        Ok(())
950
40
    }
951

            
952
    /// Extend the circuit via the ntor handshake to a new target last
953
    /// hop.
954
8
    pub async fn extend_ntor_v3<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
955
8
    where
956
8
        Tg: CircTarget,
957
8
    {
958
8
        let key = NtorV3PublicKey {
959
8
            id: *target
960
8
                .ed_identity()
961
8
                .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
962
8
            pk: *target.ntor_onion_key(),
963
        };
964
8
        let mut linkspecs = target
965
8
            .linkspecs()
966
8
            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
967
8
        if !params.extend_by_ed25519_id {
968
            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
969
8
        }
970

            
971
8
        let (tx, rx) = oneshot::channel();
972
8

            
973
8
        let peer_id = OwnedChanTarget::from_chan_target(target);
974
8
        let settings = HopSettings::from_params_and_caps(&params, target.protovers())?;
975
8
        self.control
976
8
            .unbounded_send(CtrlMsg::ExtendNtorV3 {
977
8
                peer_id,
978
8
                public_key: key,
979
8
                linkspecs,
980
8
                settings,
981
8
                done: tx,
982
8
            })
983
8
            .map_err(|_| Error::CircuitClosed)?;
984

            
985
8
        rx.await.map_err(|_| Error::CircuitClosed)??;
986

            
987
8
        Ok(())
988
8
    }
989

            
990
    /// Extend this circuit by a single, "virtual" hop.
991
    ///
992
    /// A virtual hop is one for which we do not add an actual network connection
993
    /// between separate hosts (such as Relays).  We only add a layer of
994
    /// cryptography.
995
    ///
996
    /// This is used to implement onion services: the client and the service
997
    /// both build a circuit to a single rendezvous point, and tell the
998
    /// rendezvous point to relay traffic between their two circuits.  Having
999
    /// completed a [`handshake`] out of band[^1], the parties each extend their
    /// circuits by a single "virtual" encryption hop that represents their
    /// shared cryptographic context.
    ///
    /// Once a circuit has been extended in this way, it is an error to try to
    /// extend it in any other way.
    ///
    /// [^1]: Technically, the handshake is only _mostly_ out of band: the
    ///     client sends their half of the handshake in an ` message, and the
    ///     service's response is inline in its `RENDEZVOUS2` message.
    //
    // TODO hs: let's try to enforce the "you can't extend a circuit again once
    // it has been extended this way" property.  We could do that with internal
    // state, or some kind of a type state pattern.
    #[cfg(feature = "hs-common")]
    pub async fn extend_virtual(
        &self,
        protocol: handshake::RelayProtocol,
        role: handshake::HandshakeRole,
        seed: impl handshake::KeyGenerator,
        params: &CircParameters,
        capabilities: &tor_protover::Protocols,
    ) -> Result<()> {
        use self::handshake::BoxedClientLayer;
        let protocol = handshake::RelayCryptLayerProtocol::from(protocol);
        let relay_cell_format = protocol.relay_cell_format();
        let BoxedClientLayer { fwd, back, binding } =
            protocol.construct_client_layers(role, seed)?;
        let settings = HopSettings::from_params_and_caps(params, capabilities)?
            // TODO #2037: We _should_ support negotiation here; see #2037.
            .without_negotiation();
        let (tx, rx) = oneshot::channel();
        let message = CtrlCmd::ExtendVirtual {
            relay_cell_format,
            cell_crypto: (fwd, back, binding),
            settings,
            done: tx,
        };
        self.command
            .unbounded_send(message)
            .map_err(|_| Error::CircuitClosed)?;
        rx.await.map_err(|_| Error::CircuitClosed)?
    }
    /// Helper, used to begin a stream.
    ///
    /// This function allocates a stream ID, and sends the message
    /// (like a BEGIN or RESOLVE), but doesn't wait for a response.
    ///
    /// The caller will typically want to see the first cell in response,
    /// to see whether it is e.g. an END or a CONNECTED.
68
    async fn begin_stream_impl(
68
        self: &Arc<ClientCirc>,
68
        begin_msg: AnyRelayMsg,
68
        cmd_checker: AnyCmdChecker,
102
    ) -> Result<StreamComponents> {
68
        // TODO: Possibly this should take a hop, rather than just
68
        // assuming it's the last hop.
68
        let hop = TargetHop::LastHop;
68

            
68
        let time_prov = self.time_provider.clone();
68
        let memquota = StreamAccount::new(self.mq_account())?;
68
        let (sender, receiver) = stream_queue(STREAM_READER_BUFFER, &memquota, &time_prov)?;
68
        let (tx, rx) = oneshot::channel();
68
        let (msg_tx, msg_rx) =
68
            MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(time_prov, memquota.as_raw_account())?;
68
        let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
68

            
68
        // A channel for the reactor to request a new drain rate from the reader.
68
        // Typically this notification will be sent after an XOFF is sent so that the reader can
68
        // send us a new drain rate when the stream data queue becomes empty.
68
        let mut drain_rate_request_tx = NotifySender::new_typed();
68
        let drain_rate_request_rx = drain_rate_request_tx.subscribe();
68

            
68
        self.control
68
            .unbounded_send(CtrlMsg::BeginStream {
68
                hop,
68
                message: begin_msg,
68
                sender,
68
                rx: msg_rx,
68
                rate_limit_notifier: rate_limit_tx,
68
                drain_rate_requester: drain_rate_request_tx,
68
                done: tx,
68
                cmd_checker,
68
            })
68
            .map_err(|_| Error::CircuitClosed)?;
68
        let (stream_id, hop, relay_cell_format) = rx.await.map_err(|_| Error::CircuitClosed)??;
68
        let target = StreamTarget {
68
            circ: self.clone(),
68
            tx: msg_tx,
68
            hop,
68
            stream_id,
68
            relay_cell_format,
68
            rate_limit_stream: rate_limit_rx,
68
        };
68

            
68
        // can be used to build a reader that supports XON/XOFF flow control
68
        let xon_xoff_reader_ctrl = XonXoffReaderCtrl::new(drain_rate_request_rx, target.clone());
68

            
68
        let stream_receiver = StreamReceiver {
68
            target: target.clone(),
68
            receiver,
68
            recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
68
            ended: false,
68
        };
68

            
68
        let components = StreamComponents {
68
            stream_receiver,
68
            target,
68
            memquota,
68
            xon_xoff_reader_ctrl,
68
        };
68

            
68
        Ok(components)
68
    }
    /// Start a DataStream (anonymized connection) to the given
    /// address and port, using a BEGIN cell.
68
    async fn begin_data_stream(
68
        self: &Arc<ClientCirc>,
68
        msg: AnyRelayMsg,
68
        optimistic: bool,
102
    ) -> Result<DataStream> {
68
        let components = self
68
            .begin_stream_impl(msg, DataCmdChecker::new_any())
68
            .await?;
        let StreamComponents {
68
            stream_receiver,
68
            target,
68
            memquota,
68
            xon_xoff_reader_ctrl,
68
        } = components;
68

            
68
        let mut stream = DataStream::new(
68
            self.time_provider.clone(),
68
            stream_receiver,
68
            xon_xoff_reader_ctrl,
68
            target,
68
            memquota,
68
        );
68
        if !optimistic {
60
            stream.wait_for_connection().await?;
8
        }
68
        Ok(stream)
68
    }
    /// Start a stream to the given address and port, using a BEGIN
    /// cell.
    ///
    /// The use of a string for the address is intentional: you should let
    /// the remote Tor relay do the hostname lookup for you.
60
    pub async fn begin_stream(
60
        self: &Arc<ClientCirc>,
60
        target: &str,
60
        port: u16,
60
        parameters: Option<StreamParameters>,
90
    ) -> Result<DataStream> {
60
        let parameters = parameters.unwrap_or_default();
60
        let begin_flags = parameters.begin_flags();
60
        let optimistic = parameters.is_optimistic();
60
        let target = if parameters.suppressing_hostname() {
            ""
        } else {
60
            target
        };
60
        let beginmsg = Begin::new(target, port, begin_flags)
60
            .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
60
        self.begin_data_stream(beginmsg.into(), optimistic).await
60
    }
    /// Start a new stream to the last relay in the circuit, using
    /// a BEGIN_DIR cell.
12
    pub async fn begin_dir_stream(self: Arc<ClientCirc>) -> Result<DataStream> {
8
        // Note that we always open begindir connections optimistically.
8
        // Since they are local to a relay that we've already authenticated
8
        // with and built a circuit to, there should be no additional checks
8
        // we need to perform to see whether the BEGINDIR will succeed.
8
        self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
8
            .await
8
    }
    /// Perform a DNS lookup, using a RESOLVE cell with the last relay
    /// in this circuit.
    ///
    /// Note that this function does not check for timeouts; that's
    /// the caller's responsibility.
    pub async fn resolve(self: &Arc<ClientCirc>, hostname: &str) -> Result<Vec<IpAddr>> {
        let resolve_msg = Resolve::new(hostname);
        let resolved_msg = self.try_resolve(resolve_msg).await?;
        resolved_msg
            .into_answers()
            .into_iter()
            .filter_map(|(val, _)| match resolvedval_to_result(val) {
                Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
                Ok(_) => None,
                Err(e) => Some(Err(e)),
            })
            .collect()
    }
    /// Perform a reverse DNS lookup, by sending a RESOLVE cell with
    /// the last relay on this circuit.
    ///
    /// Note that this function does not check for timeouts; that's
    /// the caller's responsibility.
    pub async fn resolve_ptr(self: &Arc<ClientCirc>, addr: IpAddr) -> Result<Vec<String>> {
        let resolve_ptr_msg = Resolve::new_reverse(&addr);
        let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
        resolved_msg
            .into_answers()
            .into_iter()
            .filter_map(|(val, _)| match resolvedval_to_result(val) {
                Ok(ResolvedVal::Hostname(v)) => Some(
                    String::from_utf8(v)
                        .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
                ),
                Ok(_) => None,
                Err(e) => Some(Err(e)),
            })
            .collect()
    }
    /// Helper: Send the resolve message, and read resolved message from
    /// resolve stream.
    async fn try_resolve(self: &Arc<ClientCirc>, msg: Resolve) -> Result<Resolved> {
        let components = self
            .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
            .await?;
        let StreamComponents {
            stream_receiver,
            target: _,
            memquota,
            xon_xoff_reader_ctrl: _,
        } = components;
        let mut resolve_stream = ResolveStream::new(stream_receiver, memquota);
        resolve_stream.read_msg().await
    }
    /// Shut down this circuit, along with all streams that are using it.
    /// Happens asynchronously (i.e. the circuit won't necessarily be done shutting down
    /// immediately after this function returns!).
    ///
    /// Note that other references to this circuit may exist.  If they
    /// do, they will stop working after you call this function.
    ///
    /// It's not necessary to call this method if you're just done
    /// with a circuit: the circuit should close on its own once nothing
    /// is using it any more.
    pub fn terminate(&self) {
        let _ = self.command.unbounded_send(CtrlCmd::Shutdown);
    }
    /// Called when a circuit-level protocol error has occurred and the
    /// circuit needs to shut down.
    ///
    /// This is a separate function because we may eventually want to have
    /// it do more than just shut down.
    ///
    /// As with `terminate`, this function is asynchronous.
    pub(crate) fn protocol_error(&self) {
        self.terminate();
    }
    /// Return true if this circuit is closed and therefore unusable.
88
    pub fn is_closing(&self) -> bool {
88
        self.control.is_closed()
88
    }
    /// Return a process-unique identifier for this circuit.
116
    pub fn unique_id(&self) -> UniqId {
116
        self.unique_id
116
    }
    /// Return the number of hops in this circuit.
    ///
    /// NOTE: This function will currently return only the number of hops
    /// _currently_ in the circuit. If there is an extend operation in progress,
    /// the currently pending hop may or may not be counted, depending on whether
    /// the extend operation finishes before this call is done.
80
    pub fn n_hops(&self) -> Result<usize> {
80
        self.mutable
80
            .n_hops(self.unique_id)
80
            .map_err(|_| Error::CircuitClosed)
80
    }
    /// Return a future that will resolve once this circuit has closed.
    ///
    /// Note that this method does not itself cause the circuit to shut down.
    ///
    /// TODO: Perhaps this should return some kind of status indication instead
    /// of just ()
    #[cfg(feature = "experimental-api")]
    pub fn wait_for_close(&self) -> impl futures::Future<Output = ()> + Send + Sync + 'static {
        self.reactor_closed_rx.clone().map(|_| ())
    }
}
/// Handle to use during an ongoing protocol exchange with a circuit's last hop
///
/// This is obtained from [`ClientCirc::start_conversation`],
/// and used to send messages to the last hop relay.
//
// TODO(conflux): this should use ClientTunnel, and it should be moved into
// the tunnel module.
#[cfg(feature = "send-control-msg")]
#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
pub struct Conversation<'r>(&'r ClientCirc);
#[cfg(feature = "send-control-msg")]
#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
impl Conversation<'_> {
    /// Send a protocol message as part of an ad-hoc exchange
    ///
    /// Responses are handled by the `MsgHandler` set up
    /// when the `Conversation` was created.
    pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
        self.send_internal(Some(msg), None).await
    }
    /// Send a `SendMsgAndInstallHandler` to the reactor and wait for the outcome
    ///
    /// The guts of `start_conversation` and `Conversation::send_msg`
    pub(crate) async fn send_internal(
        &self,
        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
    ) -> Result<()> {
        let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
        let (sender, receiver) = oneshot::channel();
        let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
            msg,
            handler,
            sender,
        };
        self.0
            .control
            .unbounded_send(ctrl_msg)
            .map_err(|_| Error::CircuitClosed)?;
        receiver.await.map_err(|_| Error::CircuitClosed)?
    }
}
impl PendingClientCirc {
    /// Instantiate a new circuit object: used from Channel::new_circ().
    ///
    /// Does not send a CREATE* cell on its own.
    ///
    ///
272
    pub(crate) fn new(
272
        id: CircId,
272
        channel: Arc<Channel>,
272
        createdreceiver: oneshot::Receiver<CreateResponse>,
272
        input: CircuitRxReceiver,
272
        unique_id: UniqId,
272
        runtime: DynTimeProvider,
272
        memquota: CircuitAccount,
272
    ) -> (PendingClientCirc, crate::tunnel::reactor::Reactor) {
272
        let time_provider = channel.time_provider().clone();
272
        let (reactor, control_tx, command_tx, reactor_closed_rx, mutable) =
272
            Reactor::new(channel, id, unique_id, input, runtime, memquota.clone());
272

            
272
        let circuit = ClientCirc {
272
            mutable,
272
            unique_id,
272
            control: control_tx,
272
            command: command_tx,
272
            reactor_closed_rx: reactor_closed_rx.shared(),
272
            #[cfg(test)]
272
            circid: id,
272
            memquota,
272
            time_provider,
272
        };
272

            
272
        let pending = PendingClientCirc {
272
            recvcreated: createdreceiver,
272
            circ: Arc::new(circuit),
272
        };
272
        (pending, reactor)
272
    }
    /// Extract the process-unique identifier for this pending circuit.
    pub fn peek_unique_id(&self) -> UniqId {
        self.circ.unique_id
    }
    /// Use the (questionable!) CREATE_FAST handshake to connect to the
    /// first hop of this circuit.
    ///
    /// There's no authentication in CRATE_FAST,
    /// so we don't need to know whom we're connecting to: we're just
    /// connecting to whichever relay the channel is for.
12
    pub async fn create_firsthop_fast(self, params: CircParameters) -> Result<Arc<ClientCirc>> {
8
        // We no nothing about this relay, so we assume it supports no protocol capabilities at all.
8
        //
8
        // TODO: If we had a consensus, we could assume it supported all required-relay-protocols.
8
        let protocols = tor_protover::Protocols::new();
8
        let settings =
8
            HopSettings::from_params_and_caps(&params, &protocols)?.without_negotiation();
8

            
8
        let (tx, rx) = oneshot::channel();
8
        self.circ
8
            .control
8
            .unbounded_send(CtrlMsg::Create {
8
                recv_created: self.recvcreated,
8
                handshake: CircuitHandshake::CreateFast,
8
                settings,
8
                done: tx,
8
            })
8
            .map_err(|_| Error::CircuitClosed)?;
8
        rx.await.map_err(|_| Error::CircuitClosed)??;
8
        Ok(self.circ)
8
    }
    /// Use the most appropriate handshake to connect to the first hop of this circuit.
    ///
    /// Note that the provided 'target' must match the channel's target,
    /// or the handshake will fail.
    pub async fn create_firsthop<Tg>(
        self,
        target: &Tg,
        params: CircParameters,
    ) -> Result<Arc<ClientCirc>>
    where
        Tg: tor_linkspec::CircTarget,
    {
        // (See note in ClientCirc::extend.)
        if target
            .protovers()
            .supports_named_subver(named::RELAY_NTORV3)
        {
            self.create_firsthop_ntor_v3(target, params).await
        } else {
            self.create_firsthop_ntor(target, params).await
        }
    }
    /// Use the ntor handshake to connect to the first hop of this circuit.
    ///
    /// Note that the provided 'target' must match the channel's target,
    /// or the handshake will fail.
8
    pub async fn create_firsthop_ntor<Tg>(
8
        self,
8
        target: &Tg,
8
        params: CircParameters,
8
    ) -> Result<Arc<ClientCirc>>
8
    where
8
        Tg: tor_linkspec::CircTarget,
8
    {
8
        let (tx, rx) = oneshot::channel();
8
        let settings =
8
            HopSettings::from_params_and_caps(&params, target.protovers())?.without_negotiation();
8

            
8
        self.circ
8
            .control
8
            .unbounded_send(CtrlMsg::Create {
8
                recv_created: self.recvcreated,
8
                handshake: CircuitHandshake::Ntor {
8
                    public_key: NtorPublicKey {
8
                        id: *target
8
                            .rsa_identity()
8
                            .ok_or(Error::MissingId(RelayIdType::Rsa))?,
8
                        pk: *target.ntor_onion_key(),
8
                    },
8
                    ed_identity: *target
8
                        .ed_identity()
8
                        .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
                },
8
                settings,
8
                done: tx,
8
            })
8
            .map_err(|_| Error::CircuitClosed)?;
8
        rx.await.map_err(|_| Error::CircuitClosed)??;
8
        Ok(self.circ)
8
    }
    /// Use the ntor_v3 handshake to connect to the first hop of this circuit.
    ///
    /// Assumes that the target supports ntor_v3. The caller should verify
    /// this before calling this function, e.g. by validating that the target
    /// has advertised ["Relay=4"](https://spec.torproject.org/tor-spec/subprotocol-versioning.html#relay).
    ///
    /// Note that the provided 'target' must match the channel's target,
    /// or the handshake will fail.
16
    pub async fn create_firsthop_ntor_v3<Tg>(
16
        self,
16
        target: &Tg,
16
        params: CircParameters,
16
    ) -> Result<Arc<ClientCirc>>
16
    where
16
        Tg: tor_linkspec::CircTarget,
16
    {
16
        let settings = HopSettings::from_params_and_caps(&params, target.protovers())?;
16
        let (tx, rx) = oneshot::channel();
16

            
16
        self.circ
16
            .control
16
            .unbounded_send(CtrlMsg::Create {
16
                recv_created: self.recvcreated,
16
                handshake: CircuitHandshake::NtorV3 {
16
                    public_key: NtorV3PublicKey {
16
                        id: *target
16
                            .ed_identity()
16
                            .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
16
                        pk: *target.ntor_onion_key(),
16
                    },
16
                },
16
                settings,
16
                done: tx,
16
            })
16
            .map_err(|_| Error::CircuitClosed)?;
16
        rx.await.map_err(|_| Error::CircuitClosed)??;
16
        Ok(self.circ)
16
    }
}
/// A collection of components that can be combined to implement a Tor stream,
/// or anything that requires a stream ID.
///
/// Not all components may be needed, depending on the purpose of the "stream".
/// For example we build `RELAY_RESOLVE` requests like we do data streams,
/// but they won't use the `StreamTarget` as they don't need to send additional
/// messages.
#[derive(Debug)]
pub(crate) struct StreamComponents {
    /// A [`Stream`](futures::Stream) of incoming relay messages for this Tor stream.
    pub(crate) stream_receiver: StreamReceiver,
    /// A handle that can communicate messages to the circuit reactor for this stream.
    pub(crate) target: StreamTarget,
    /// The memquota [account](tor_memquota::Account) to use for data on this stream.
    pub(crate) memquota: StreamAccount,
    /// The control information needed to add XON/XOFF flow control to the stream.
    pub(crate) xon_xoff_reader_ctrl: XonXoffReaderCtrl,
}
/// Convert a [`ResolvedVal`] into a Result, based on whether or not
/// it represents an error.
fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
    match val {
        ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
        ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
        ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
        _ => Ok(val),
    }
}
#[cfg(test)]
pub(crate) mod test {
    // @@ begin test lint list maintained by maint/add_warning @@
    #![allow(clippy::bool_assert_comparison)]
    #![allow(clippy::clone_on_copy)]
    #![allow(clippy::dbg_macro)]
    #![allow(clippy::mixed_attributes_style)]
    #![allow(clippy::print_stderr)]
    #![allow(clippy::print_stdout)]
    #![allow(clippy::single_char_pattern)]
    #![allow(clippy::unwrap_used)]
    #![allow(clippy::unchecked_duration_subtraction)]
    #![allow(clippy::useless_vec)]
    #![allow(clippy::needless_pass_by_value)]
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
    use super::*;
    use crate::channel::OpenChanCellS2C;
    use crate::channel::{test::new_reactor, CodecError};
    use crate::congestion::test_utils::params::build_cc_vegas_params;
    use crate::crypto::cell::RelayCellBody;
    use crate::crypto::handshake::ntor_v3::NtorV3Server;
    #[cfg(feature = "hs-service")]
    use crate::stream::IncomingStreamRequestFilter;
    use chanmsg::{AnyChanMsg, Created2, CreatedFast};
    use futures::channel::mpsc::{Receiver, Sender};
    use futures::io::{AsyncReadExt, AsyncWriteExt};
    use futures::sink::SinkExt;
    use futures::stream::StreamExt;
    use futures::task::SpawnExt;
    use hex_literal::hex;
    use std::collections::{HashMap, VecDeque};
    use std::fmt::Debug;
    use std::time::Duration;
    use tor_basic_utils::test_rng::testing_rng;
    use tor_cell::chancell::{msg as chanmsg, AnyChanCell, BoxedCellBody, ChanCell, ChanCmd};
    use tor_cell::relaycell::extend::{self as extend_ext, CircRequestExt, CircResponseExt};
    use tor_cell::relaycell::msg::SendmeTag;
    use tor_cell::relaycell::{
        msg as relaymsg, AnyRelayMsgOuter, RelayCellFormat, RelayCmd, RelayMsg as _, StreamId,
    };
    use tor_linkspec::OwnedCircTarget;
    use tor_memquota::HasMemoryCost;
    use tor_rtcompat::Runtime;
    use tracing::trace;
    use tracing_test::traced_test;
    #[cfg(feature = "conflux")]
    use {
        crate::tunnel::reactor::ConfluxHandshakeResult,
        crate::util::err::ConfluxHandshakeError,
        futures::future::FusedFuture,
        futures::lock::Mutex as AsyncMutex,
        std::pin::Pin,
        std::result::Result as StdResult,
        tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
        tor_cell::relaycell::msg::ConfluxLink,
        tor_rtmock::MockRuntime,
    };
    impl PendingClientCirc {
        /// Testing only: Extract the circuit ID for this pending circuit.
        pub(crate) fn peek_circid(&self) -> CircId {
            self.circ.circid
        }
    }
    impl ClientCirc {
        /// Testing only: Extract the circuit ID of this circuit.
        pub(crate) fn peek_circid(&self) -> CircId {
            self.circid
        }
    }
    fn rmsg_to_ccmsg(id: Option<StreamId>, msg: relaymsg::AnyRelayMsg) -> ClientCircChanMsg {
        // TODO #1947: test other formats.
        let rfmt = RelayCellFormat::V0;
        let body: BoxedCellBody = AnyRelayMsgOuter::new(id, msg)
            .encode(rfmt, &mut testing_rng())
            .unwrap();
        let chanmsg = chanmsg::Relay::from(body);
        ClientCircChanMsg::Relay(chanmsg)
    }
    // Example relay IDs and keys
    const EXAMPLE_SK: [u8; 32] =
        hex!("7789d92a89711a7e2874c61ea495452cfd48627b3ca2ea9546aafa5bf7b55803");
    const EXAMPLE_PK: [u8; 32] =
        hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253");
    const EXAMPLE_ED_ID: [u8; 32] = [6; 32];
    const EXAMPLE_RSA_ID: [u8; 20] = [10; 20];
    /// Make an MPSC queue, of the type we use in Channels, but a fake one for testing
    #[cfg(test)]
    pub(crate) fn fake_mpsc<T: HasMemoryCost + Debug + Send>(
        buffer: usize,
    ) -> (StreamMpscSender<T>, StreamMpscReceiver<T>) {
        crate::fake_mpsc(buffer)
    }
    /// return an example OwnedCircTarget that can get used for an ntor handshake.
    fn example_target() -> OwnedCircTarget {
        let mut builder = OwnedCircTarget::builder();
        builder
            .chan_target()
            .ed_identity(EXAMPLE_ED_ID.into())
            .rsa_identity(EXAMPLE_RSA_ID.into());
        builder
            .ntor_onion_key(EXAMPLE_PK.into())
            .protocols("FlowCtrl=1-2".parse().unwrap())
            .build()
            .unwrap()
    }
    fn example_ntor_key() -> crate::crypto::handshake::ntor::NtorSecretKey {
        crate::crypto::handshake::ntor::NtorSecretKey::new(
            EXAMPLE_SK.into(),
            EXAMPLE_PK.into(),
            EXAMPLE_RSA_ID.into(),
        )
    }
    fn example_ntor_v3_key() -> crate::crypto::handshake::ntor_v3::NtorV3SecretKey {
        crate::crypto::handshake::ntor_v3::NtorV3SecretKey::new(
            EXAMPLE_SK.into(),
            EXAMPLE_PK.into(),
            EXAMPLE_ED_ID.into(),
        )
    }
    fn working_fake_channel<R: Runtime>(
        rt: &R,
    ) -> (
        Arc<Channel>,
        Receiver<AnyChanCell>,
        Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
    ) {
        let (channel, chan_reactor, rx, tx) = new_reactor(rt.clone());
        rt.spawn(async {
            let _ignore = chan_reactor.run().await;
        })
        .unwrap();
        (channel, rx, tx)
    }
    /// Which handshake type to use.
    #[derive(Copy, Clone)]
    enum HandshakeType {
        Fast,
        Ntor,
        NtorV3,
    }
    async fn test_create<R: Runtime>(rt: &R, handshake_type: HandshakeType, with_cc: bool) {
        // We want to try progressing from a pending circuit to a circuit
        // via a crate_fast handshake.
        use crate::crypto::handshake::{fast::CreateFastServer, ntor::NtorServer, ServerHandshake};
        let (chan, mut rx, _sink) = working_fake_channel(rt);
        let circid = CircId::new(128).unwrap();
        let (created_send, created_recv) = oneshot::channel();
        let (_circmsg_send, circmsg_recv) = fake_mpsc(64);
        let unique_id = UniqId::new(23, 17);
        let (pending, reactor) = PendingClientCirc::new(
            circid,
            chan,
            created_recv,
            circmsg_recv,
            unique_id,
            DynTimeProvider::new(rt.clone()),
            CircuitAccount::new_noop(),
        );
        rt.spawn(async {
            let _ignore = reactor.run().await;
        })
        .unwrap();
        // Future to pretend to be a relay on the other end of the circuit.
        let simulate_relay_fut = async move {
            let mut rng = testing_rng();
            let create_cell = rx.next().await.unwrap();
            assert_eq!(create_cell.circid(), Some(circid));
            let reply = match handshake_type {
                HandshakeType::Fast => {
                    let cf = match create_cell.msg() {
                        AnyChanMsg::CreateFast(cf) => cf,
                        other => panic!("{:?}", other),
                    };
                    let (_, rep) = CreateFastServer::server(
                        &mut rng,
                        &mut |_: &()| Some(()),
                        &[()],
                        cf.handshake(),
                    )
                    .unwrap();
                    CreateResponse::CreatedFast(CreatedFast::new(rep))
                }
                HandshakeType::Ntor => {
                    let c2 = match create_cell.msg() {
                        AnyChanMsg::Create2(c2) => c2,
                        other => panic!("{:?}", other),
                    };
                    let (_, rep) = NtorServer::server(
                        &mut rng,
                        &mut |_: &()| Some(()),
                        &[example_ntor_key()],
                        c2.body(),
                    )
                    .unwrap();
                    CreateResponse::Created2(Created2::new(rep))
                }
                HandshakeType::NtorV3 => {
                    let c2 = match create_cell.msg() {
                        AnyChanMsg::Create2(c2) => c2,
                        other => panic!("{:?}", other),
                    };
                    let mut reply_fn = if with_cc {
                        |client_exts: &[CircRequestExt]| {
                            let _ = client_exts
                                .iter()
                                .find(|e| matches!(e, CircRequestExt::CcRequest(_)))
                                .expect("Client failed to request CC");
                            // This needs to be aligned to test_utils params
                            // value due to validation that needs it in range.
                            Some(vec![CircResponseExt::CcResponse(
                                extend_ext::CcResponse::new(31),
                            )])
                        }
                    } else {
                        |_: &_| Some(vec![])
                    };
                    let (_, rep) = NtorV3Server::server(
                        &mut rng,
                        &mut reply_fn,
                        &[example_ntor_v3_key()],
                        c2.body(),
                    )
                    .unwrap();
                    CreateResponse::Created2(Created2::new(rep))
                }
            };
            created_send.send(reply).unwrap();
        };
        // Future to pretend to be a client.
        let client_fut = async move {
            let target = example_target();
            let params = CircParameters::default();
            let ret = match handshake_type {
                HandshakeType::Fast => {
                    trace!("doing fast create");
                    pending.create_firsthop_fast(params).await
                }
                HandshakeType::Ntor => {
                    trace!("doing ntor create");
                    pending.create_firsthop_ntor(&target, params).await
                }
                HandshakeType::NtorV3 => {
                    let params = if with_cc {
                        // Setup CC vegas parameters.
                        CircParameters::new(true, build_cc_vegas_params())
                    } else {
                        params
                    };
                    trace!("doing ntor_v3 create");
                    pending.create_firsthop_ntor_v3(&target, params).await
                }
            };
            trace!("create done: result {:?}", ret);
            ret
        };
        let (circ, _) = futures::join!(client_fut, simulate_relay_fut);
        let _circ = circ.unwrap();
        // pfew!  We've build a circuit!  Let's make sure it has one hop.
        assert_eq!(_circ.n_hops().unwrap(), 1);
    }
    #[traced_test]
    #[test]
    fn test_create_fast() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_create(&rt, HandshakeType::Fast, false).await;
        });
    }
    #[traced_test]
    #[test]
    fn test_create_ntor() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_create(&rt, HandshakeType::Ntor, false).await;
        });
    }
    #[traced_test]
    #[test]
    fn test_create_ntor_v3() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_create(&rt, HandshakeType::NtorV3, false).await;
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "flowctl-cc")]
    fn test_create_ntor_v3_with_cc() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_create(&rt, HandshakeType::NtorV3, true).await;
        });
    }
    // An encryption layer that doesn't do any crypto.   Can be used
    // as inbound or outbound, but not both at once.
    pub(crate) struct DummyCrypto {
        counter_tag: [u8; 20],
        counter: u32,
        lasthop: bool,
    }
    impl DummyCrypto {
        fn next_tag(&mut self) -> SendmeTag {
            #![allow(clippy::identity_op)]
            self.counter_tag[0] = ((self.counter >> 0) & 255) as u8;
            self.counter_tag[1] = ((self.counter >> 8) & 255) as u8;
            self.counter_tag[2] = ((self.counter >> 16) & 255) as u8;
            self.counter_tag[3] = ((self.counter >> 24) & 255) as u8;
            self.counter += 1;
            self.counter_tag.into()
        }
    }
    impl crate::crypto::cell::OutboundClientLayer for DummyCrypto {
        fn originate_for(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
            self.next_tag()
        }
        fn encrypt_outbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
    }
    impl crate::crypto::cell::InboundClientLayer for DummyCrypto {
        fn decrypt_inbound(
            &mut self,
            _cmd: ChanCmd,
            _cell: &mut RelayCellBody,
        ) -> Option<SendmeTag> {
            if self.lasthop {
                Some(self.next_tag())
            } else {
                None
            }
        }
    }
    impl DummyCrypto {
        pub(crate) fn new(lasthop: bool) -> Self {
            DummyCrypto {
                counter_tag: [0; 20],
                counter: 0,
                lasthop,
            }
        }
    }
    // Helper: set up a 3-hop circuit with no encryption, where the
    // next inbound message seems to come from hop next_msg_from
    async fn newcirc_ext<R: Runtime>(
        rt: &R,
        unique_id: UniqId,
        chan: Arc<Channel>,
        hops: Vec<path::HopDetail>,
        next_msg_from: HopNum,
        params: CircParameters,
    ) -> (Arc<ClientCirc>, CircuitRxSender) {
        let circid = CircId::new(128).unwrap();
        let (_created_send, created_recv) = oneshot::channel();
        let (circmsg_send, circmsg_recv) = fake_mpsc(64);
        let (pending, reactor) = PendingClientCirc::new(
            circid,
            chan,
            created_recv,
            circmsg_recv,
            unique_id,
            DynTimeProvider::new(rt.clone()),
            CircuitAccount::new_noop(),
        );
        rt.spawn(async {
            let _ignore = reactor.run().await;
        })
        .unwrap();
        let PendingClientCirc {
            circ,
            recvcreated: _,
        } = pending;
        // TODO #1067: Support other formats
        let relay_cell_format = RelayCellFormat::V0;
        let last_hop_num = u8::try_from(hops.len() - 1).unwrap();
        for (idx, peer_id) in hops.into_iter().enumerate() {
            let (tx, rx) = oneshot::channel();
            let idx = idx as u8;
            circ.command
                .unbounded_send(CtrlCmd::AddFakeHop {
                    relay_cell_format,
                    fwd_lasthop: idx == last_hop_num,
                    rev_lasthop: idx == u8::from(next_msg_from),
                    peer_id,
                    params: params.clone(),
                    done: tx,
                })
                .unwrap();
            rx.await.unwrap().unwrap();
        }
        (circ, circmsg_send)
    }
    // Helper: set up a 3-hop circuit with no encryption, where the
    // next inbound message seems to come from hop next_msg_from
    async fn newcirc<R: Runtime>(rt: &R, chan: Arc<Channel>) -> (Arc<ClientCirc>, CircuitRxSender) {
        let hops = std::iter::repeat_with(|| {
            let peer_id = tor_linkspec::OwnedChanTarget::builder()
                .ed_identity([4; 32].into())
                .rsa_identity([5; 20].into())
                .build()
                .expect("Could not construct fake hop");
            path::HopDetail::Relay(peer_id)
        })
        .take(3)
        .collect();
        let unique_id = UniqId::new(23, 17);
        newcirc_ext(
            rt,
            unique_id,
            chan,
            hops,
            2.into(),
            CircParameters::default(),
        )
        .await
    }
    /// Create `n` distinct [`path::HopDetail`]s,
    /// with the specified `start_idx` for the dummy identities.
    fn hop_details(n: u8, start_idx: u8) -> Vec<path::HopDetail> {
        (0..n)
            .map(|idx| {
                let peer_id = tor_linkspec::OwnedChanTarget::builder()
                    .ed_identity([idx + start_idx; 32].into())
                    .rsa_identity([idx + start_idx + 1; 20].into())
                    .build()
                    .expect("Could not construct fake hop");
                path::HopDetail::Relay(peer_id)
            })
            .collect()
    }
    async fn test_extend<R: Runtime>(rt: &R, handshake_type: HandshakeType) {
        use crate::crypto::handshake::{ntor::NtorServer, ServerHandshake};
        let (chan, mut rx, _sink) = working_fake_channel(rt);
        let (circ, mut sink) = newcirc(rt, chan).await;
        let circid = circ.peek_circid();
        let params = CircParameters::default();
        let extend_fut = async move {
            let target = example_target();
            match handshake_type {
                HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
                HandshakeType::Ntor => circ.extend_ntor(&target, params).await.unwrap(),
                HandshakeType::NtorV3 => circ.extend_ntor_v3(&target, params).await.unwrap(),
            };
            circ // gotta keep the circ alive, or the reactor would exit.
        };
        let reply_fut = async move {
            // We've disabled encryption on this circuit, so we can just
            // read the extend2 cell.
            let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
            assert_eq!(id, Some(circid));
            let rmsg = match chmsg {
                AnyChanMsg::RelayEarly(r) => {
                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                        .unwrap()
                }
                other => panic!("{:?}", other),
            };
            let e2 = match rmsg.msg() {
                AnyRelayMsg::Extend2(e2) => e2,
                other => panic!("{:?}", other),
            };
            let mut rng = testing_rng();
            let reply = match handshake_type {
                HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
                HandshakeType::Ntor => {
                    let (_keygen, reply) = NtorServer::server(
                        &mut rng,
                        &mut |_: &()| Some(()),
                        &[example_ntor_key()],
                        e2.handshake(),
                    )
                    .unwrap();
                    reply
                }
                HandshakeType::NtorV3 => {
                    let (_keygen, reply) = NtorV3Server::server(
                        &mut rng,
                        &mut |_: &[CircRequestExt]| Some(vec![]),
                        &[example_ntor_v3_key()],
                        e2.handshake(),
                    )
                    .unwrap();
                    reply
                }
            };
            let extended2 = relaymsg::Extended2::new(reply).into();
            sink.send(rmsg_to_ccmsg(None, extended2)).await.unwrap();
            (sink, rx) // gotta keep the sink and receiver alive, or the reactor will exit.
        };
        let (circ, (_sink, _rx)) = futures::join!(extend_fut, reply_fut);
        // Did we really add another hop?
        assert_eq!(circ.n_hops().unwrap(), 4);
        // Do the path accessors report a reasonable outcome?
        {
            let path = circ.path_ref().unwrap();
            let path = path
                .all_hops()
                .filter_map(|hop| match hop {
                    path::HopDetail::Relay(r) => Some(r),
                    #[cfg(feature = "hs-common")]
                    path::HopDetail::Virtual => None,
                })
                .collect::<Vec<_>>();
            assert_eq!(path.len(), 4);
            use tor_linkspec::HasRelayIds;
            assert_eq!(path[3].ed_identity(), example_target().ed_identity());
            assert_ne!(path[0].ed_identity(), example_target().ed_identity());
        }
        {
            let path = circ.path_ref().unwrap();
            assert_eq!(path.n_hops(), 4);
            use tor_linkspec::HasRelayIds;
            assert_eq!(
                path.hops()[3].as_chan_target().unwrap().ed_identity(),
                example_target().ed_identity()
            );
            assert_ne!(
                path.hops()[0].as_chan_target().unwrap().ed_identity(),
                example_target().ed_identity()
            );
        }
    }
    #[traced_test]
    #[test]
    fn test_extend_ntor() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_extend(&rt, HandshakeType::Ntor).await;
        });
    }
    #[traced_test]
    #[test]
    fn test_extend_ntor_v3() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_extend(&rt, HandshakeType::NtorV3).await;
        });
    }
    async fn bad_extend_test_impl<R: Runtime>(
        rt: &R,
        reply_hop: HopNum,
        bad_reply: ClientCircChanMsg,
    ) -> Error {
        let (chan, _rx, _sink) = working_fake_channel(rt);
        let hops = std::iter::repeat_with(|| {
            let peer_id = tor_linkspec::OwnedChanTarget::builder()
                .ed_identity([4; 32].into())
                .rsa_identity([5; 20].into())
                .build()
                .expect("Could not construct fake hop");
            path::HopDetail::Relay(peer_id)
        })
        .take(3)
        .collect();
        let unique_id = UniqId::new(23, 17);
        let (circ, mut sink) = newcirc_ext(
            rt,
            unique_id,
            chan,
            hops,
            reply_hop,
            CircParameters::default(),
        )
        .await;
        let params = CircParameters::default();
        let target = example_target();
        #[allow(clippy::clone_on_copy)]
        let rtc = rt.clone();
        let sink_handle = rt
            .spawn_with_handle(async move {
                rtc.sleep(Duration::from_millis(100)).await;
                sink.send(bad_reply).await.unwrap();
                sink
            })
            .unwrap();
        let outcome = circ.extend_ntor(&target, params).await;
        let _sink = sink_handle.await;
        assert_eq!(circ.n_hops().unwrap(), 3);
        assert!(outcome.is_err());
        outcome.unwrap_err()
    }
    #[traced_test]
    #[test]
    fn bad_extend_wronghop() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let extended2 = relaymsg::Extended2::new(vec![]).into();
            let cc = rmsg_to_ccmsg(None, extended2);
            let error = bad_extend_test_impl(&rt, 1.into(), cc).await;
            // This case shows up as a CircDestroy, since a message sent
            // from the wrong hop won't even be delivered to the extend
            // code's meta-handler.  Instead the unexpected message will cause
            // the circuit to get torn down.
            match error {
                Error::CircuitClosed => {}
                x => panic!("got other error: {}", x),
            }
        });
    }
    #[traced_test]
    #[test]
    fn bad_extend_wrongtype() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let extended = relaymsg::Extended::new(vec![7; 200]).into();
            let cc = rmsg_to_ccmsg(None, extended);
            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
            match error {
                Error::BytesErr {
                    err: tor_bytes::Error::InvalidMessage(_),
                    object: "extended2 message",
                } => {}
                other => panic!("{:?}", other),
            }
        });
    }
    #[traced_test]
    #[test]
    fn bad_extend_destroy() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let cc = ClientCircChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
            match error {
                Error::CircuitClosed => {}
                other => panic!("{:?}", other),
            }
        });
    }
    #[traced_test]
    #[test]
    fn bad_extend_crypto() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
            let cc = rmsg_to_ccmsg(None, extended2);
            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
            assert!(matches!(error, Error::BadCircHandshakeAuth));
        });
    }
    #[traced_test]
    #[test]
    fn begindir() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let (chan, mut rx, _sink) = working_fake_channel(&rt);
            let (circ, mut sink) = newcirc(&rt, chan).await;
            let circid = circ.peek_circid();
            let begin_and_send_fut = async move {
                // Here we'll say we've got a circuit, and we want to
                // make a simple BEGINDIR request with it.
                let mut stream = circ.begin_dir_stream().await.unwrap();
                stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
                stream.flush().await.unwrap();
                let mut buf = [0_u8; 1024];
                let n = stream.read(&mut buf).await.unwrap();
                assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
                let n = stream.read(&mut buf).await.unwrap();
                assert_eq!(n, 0);
                stream
            };
            let reply_fut = async move {
                // We've disabled encryption on this circuit, so we can just
                // read the begindir cell.
                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
                assert_eq!(id, Some(circid));
                let rmsg = match chmsg {
                    AnyChanMsg::Relay(r) => {
                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                            .unwrap()
                    }
                    other => panic!("{:?}", other),
                };
                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
                assert!(matches!(rmsg, AnyRelayMsg::BeginDir(_)));
                // Reply with a Connected cell to indicate success.
                let connected = relaymsg::Connected::new_empty().into();
                sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
                // Now read a DATA cell...
                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
                assert_eq!(id, Some(circid));
                let rmsg = match chmsg {
                    AnyChanMsg::Relay(r) => {
                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                            .unwrap()
                    }
                    other => panic!("{:?}", other),
                };
                let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
                assert_eq!(streamid_2, streamid);
                if let AnyRelayMsg::Data(d) = rmsg {
                    assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
                } else {
                    panic!();
                }
                // Write another data cell in reply!
                let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
                    .unwrap()
                    .into();
                sink.send(rmsg_to_ccmsg(streamid, data)).await.unwrap();
                // Send an END cell to say that the conversation is over.
                let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
                sink.send(rmsg_to_ccmsg(streamid, end)).await.unwrap();
                (rx, sink) // gotta keep these alive, or the reactor will exit.
            };
            let (_stream, (_rx, _sink)) = futures::join!(begin_and_send_fut, reply_fut);
        });
    }
    // Test: close a stream, either by dropping it or by calling AsyncWriteExt::close.
    fn close_stream_helper(by_drop: bool) {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let (chan, mut rx, _sink) = working_fake_channel(&rt);
            let (circ, mut sink) = newcirc(&rt, chan).await;
            let stream_fut = async move {
                let stream = circ
                    .begin_stream("www.example.com", 80, None)
                    .await
                    .unwrap();
                let (r, mut w) = stream.split();
                if by_drop {
                    // Drop the writer and the reader, which should close the stream.
                    drop(r);
                    drop(w);
                    (None, circ) // make sure to keep the circuit alive
                } else {
                    // Call close on the writer, while keeping the reader alive.
                    w.close().await.unwrap();
                    (Some(r), circ)
                }
            };
            let handler_fut = async {
                // Read the BEGIN message.
                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
                let rmsg = match msg {
                    AnyChanMsg::Relay(r) => {
                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                            .unwrap()
                    }
                    other => panic!("{:?}", other),
                };
                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
                assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
                // Reply with a CONNECTED.
                let connected =
                    relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
                sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
                // Expect an END.
                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
                let rmsg = match msg {
                    AnyChanMsg::Relay(r) => {
                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                            .unwrap()
                    }
                    other => panic!("{:?}", other),
                };
                let (_, rmsg) = rmsg.into_streamid_and_msg();
                assert_eq!(rmsg.cmd(), RelayCmd::END);
                (rx, sink) // keep these alive or the reactor will exit.
            };
            let ((_opt_reader, _circ), (_rx, _sink)) = futures::join!(stream_fut, handler_fut);
        });
    }
    #[traced_test]
    #[test]
    fn drop_stream() {
        close_stream_helper(true);
    }
    #[traced_test]
    #[test]
    fn close_stream() {
        close_stream_helper(false);
    }
    // Set up a circuit and stream that expects some incoming SENDMEs.
    async fn setup_incoming_sendme_case<R: Runtime>(
        rt: &R,
        n_to_send: usize,
    ) -> (
        Arc<ClientCirc>,
        DataStream,
        CircuitRxSender,
        Option<StreamId>,
        usize,
        Receiver<AnyChanCell>,
        Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
    ) {
        let (chan, mut rx, sink2) = working_fake_channel(rt);
        let (circ, mut sink) = newcirc(rt, chan).await;
        let circid = circ.peek_circid();
        let begin_and_send_fut = {
            let circ = circ.clone();
            async move {
                // Take our circuit and make a stream on it.
                let mut stream = circ
                    .begin_stream("www.example.com", 443, None)
                    .await
                    .unwrap();
                let junk = [0_u8; 1024];
                let mut remaining = n_to_send;
                while remaining > 0 {
                    let n = std::cmp::min(remaining, junk.len());
                    stream.write_all(&junk[..n]).await.unwrap();
                    remaining -= n;
                }
                stream.flush().await.unwrap();
                stream
            }
        };
        let receive_fut = async move {
            // Read the begin cell.
            let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
            let rmsg = match chmsg {
                AnyChanMsg::Relay(r) => {
                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                        .unwrap()
                }
                other => panic!("{:?}", other),
            };
            let (streamid, rmsg) = rmsg.into_streamid_and_msg();
            assert!(matches!(rmsg, AnyRelayMsg::Begin(_)));
            // Reply with a connected cell...
            let connected = relaymsg::Connected::new_empty().into();
            sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
            // Now read bytes from the stream until we have them all.
            let mut bytes_received = 0_usize;
            let mut cells_received = 0_usize;
            while bytes_received < n_to_send {
                // Read a data cell, and remember how much we got.
                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
                assert_eq!(id, Some(circid));
                let rmsg = match chmsg {
                    AnyChanMsg::Relay(r) => {
                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                            .unwrap()
                    }
                    other => panic!("{:?}", other),
                };
                let (streamid2, rmsg) = rmsg.into_streamid_and_msg();
                assert_eq!(streamid2, streamid);
                if let AnyRelayMsg::Data(dat) = rmsg {
                    cells_received += 1;
                    bytes_received += dat.as_ref().len();
                } else {
                    panic!();
                }
            }
            (sink, streamid, cells_received, rx)
        };
        let (stream, (sink, streamid, cells_received, rx)) =
            futures::join!(begin_and_send_fut, receive_fut);
        (circ, stream, sink, streamid, cells_received, rx, sink2)
    }
    #[traced_test]
    #[test]
    fn accept_valid_sendme() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let (circ, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
            assert_eq!(cells_received, 301);
            // Make sure that the circuit is indeed expecting the right sendmes
            {
                let (tx, rx) = oneshot::channel();
                circ.command
                    .unbounded_send(CtrlCmd::QuerySendWindow {
                        hop: 2.into(),
                        leg: circ.unique_id(),
                        done: tx,
                    })
                    .unwrap();
                let (window, tags) = rx.await.unwrap().unwrap();
                assert_eq!(window, 1000 - 301);
                assert_eq!(tags.len(), 3);
                // 100
                assert_eq!(
                    tags[0],
                    SendmeTag::from(hex!("6400000000000000000000000000000000000000"))
                );
                // 200
                assert_eq!(
                    tags[1],
                    SendmeTag::from(hex!("c800000000000000000000000000000000000000"))
                );
                // 300
                assert_eq!(
                    tags[2],
                    SendmeTag::from(hex!("2c01000000000000000000000000000000000000"))
                );
            }
            let reply_with_sendme_fut = async move {
                // make and send a circuit-level sendme.
                let c_sendme =
                    relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000"))
                        .into();
                sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
                // Make and send a stream-level sendme.
                let s_sendme = relaymsg::Sendme::new_empty().into();
                sink.send(rmsg_to_ccmsg(streamid, s_sendme)).await.unwrap();
                sink
            };
            let _sink = reply_with_sendme_fut.await;
            rt.advance_until_stalled().await;
            // Now make sure that the circuit is still happy, and its
            // window is updated.
            {
                let (tx, rx) = oneshot::channel();
                circ.command
                    .unbounded_send(CtrlCmd::QuerySendWindow {
                        hop: 2.into(),
                        leg: circ.unique_id(),
                        done: tx,
                    })
                    .unwrap();
                let (window, _tags) = rx.await.unwrap().unwrap();
                assert_eq!(window, 1000 - 201);
            }
        });
    }
    #[traced_test]
    #[test]
    fn invalid_circ_sendme() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            // Same setup as accept_valid_sendme() test above but try giving
            // a sendme with the wrong tag.
            let (circ, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
            let reply_with_sendme_fut = async move {
                // make and send a circuit-level sendme with a bad tag.
                let c_sendme =
                    relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF"))
                        .into();
                sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
                sink
            };
            let _sink = reply_with_sendme_fut.await;
            // Check whether the reactor dies as a result of receiving invalid data.
            rt.advance_until_stalled().await;
            assert!(circ.is_closing());
        });
    }
    #[traced_test]
    #[test]
    fn test_busy_stream_fairness() {
        // Number of streams to use.
        const N_STREAMS: usize = 3;
        // Number of cells (roughly) for each stream to send.
        const N_CELLS: usize = 20;
        // Number of bytes that *each* stream will send, and that we'll read
        // from the channel.
        const N_BYTES: usize = relaymsg::Data::MAXLEN_V0 * N_CELLS;
        // Ignoring cell granularity, with perfect fairness we'd expect
        // `N_BYTES/N_STREAMS` bytes from each stream.
        //
        // We currently allow for up to a full cell less than that.  This is
        // somewhat arbitrary and can be changed as needed, since we don't
        // provide any specific fairness guarantees.
        const MIN_EXPECTED_BYTES_PER_STREAM: usize =
            N_BYTES / N_STREAMS - relaymsg::Data::MAXLEN_V0;
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let (chan, mut rx, _sink) = working_fake_channel(&rt);
            let (circ, mut sink) = newcirc(&rt, chan).await;
            // Run clients in a single task, doing our own round-robin
            // scheduling of writes to the reactor. Conversely, if we were to
            // put each client in its own task, we would be at the the mercy of
            // how fairly the runtime schedules the client tasks, which is outside
            // the scope of this test.
            rt.spawn({
                // Clone the circuit to keep it alive after writers have
                // finished with it.
                let circ = circ.clone();
                async move {
                    let mut clients = VecDeque::new();
                    struct Client {
                        stream: DataStream,
                        to_write: &'static [u8],
                    }
                    for _ in 0..N_STREAMS {
                        clients.push_back(Client {
                            stream: circ
                                .begin_stream("www.example.com", 80, None)
                                .await
                                .unwrap(),
                            to_write: &[0_u8; N_BYTES][..],
                        });
                    }
                    while let Some(mut client) = clients.pop_front() {
                        if client.to_write.is_empty() {
                            // Client is done. Don't put back in queue.
                            continue;
                        }
                        let written = client.stream.write(client.to_write).await.unwrap();
                        client.to_write = &client.to_write[written..];
                        clients.push_back(client);
                    }
                }
            })
            .unwrap();
            let channel_handler_fut = async {
                let mut stream_bytes_received = HashMap::<StreamId, usize>::new();
                let mut total_bytes_received = 0;
                loop {
                    let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
                    let rmsg = match msg {
                        AnyChanMsg::Relay(r) => AnyRelayMsgOuter::decode_singleton(
                            RelayCellFormat::V0,
                            r.into_relay_body(),
                        )
                        .unwrap(),
                        other => panic!("Unexpected chanmsg: {other:?}"),
                    };
                    let (streamid, rmsg) = rmsg.into_streamid_and_msg();
                    match rmsg.cmd() {
                        RelayCmd::BEGIN => {
                            // Add an entry for this stream.
                            let prev = stream_bytes_received.insert(streamid.unwrap(), 0);
                            assert_eq!(prev, None);
                            // Reply with a CONNECTED.
                            let connected = relaymsg::Connected::new_with_addr(
                                "10.0.0.1".parse().unwrap(),
                                1234,
                            )
                            .into();
                            sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
                        }
                        RelayCmd::DATA => {
                            let data_msg = relaymsg::Data::try_from(rmsg).unwrap();
                            let nbytes = data_msg.as_ref().len();
                            total_bytes_received += nbytes;
                            let streamid = streamid.unwrap();
                            let stream_bytes = stream_bytes_received.get_mut(&streamid).unwrap();
                            *stream_bytes += nbytes;
                            if total_bytes_received >= N_BYTES {
                                break;
                            }
                        }
                        RelayCmd::END => {
                            // Stream is done. If fair scheduling is working as
                            // expected we *probably* shouldn't get here, but we
                            // can ignore it and save the failure until we
                            // actually have the final stats.
                            continue;
                        }
                        other => {
                            panic!("Unexpected command {other:?}");
                        }
                    }
                }
                // Return our stats, along with the `rx` and `sink` to keep the
                // reactor alive (since clients could still be writing).
                (total_bytes_received, stream_bytes_received, rx, sink)
            };
            let (total_bytes_received, stream_bytes_received, _rx, _sink) =
                channel_handler_fut.await;
            assert_eq!(stream_bytes_received.len(), N_STREAMS);
            for (sid, stream_bytes) in stream_bytes_received {
                assert!(
                    stream_bytes >= MIN_EXPECTED_BYTES_PER_STREAM,
                    "Only {stream_bytes} of {total_bytes_received} bytes received from {N_STREAMS} came from {sid:?}; expected at least {MIN_EXPECTED_BYTES_PER_STREAM}"
                );
            }
        });
    }
    #[test]
    fn basic_params() {
        use super::CircParameters;
        let mut p = CircParameters::default();
        assert!(p.extend_by_ed25519_id);
        p.extend_by_ed25519_id = false;
        assert!(!p.extend_by_ed25519_id);
    }
    #[cfg(feature = "hs-service")]
    struct AllowAllStreamsFilter;
    #[cfg(feature = "hs-service")]
    impl IncomingStreamRequestFilter for AllowAllStreamsFilter {
        fn disposition(
            &mut self,
            _ctx: &crate::stream::IncomingStreamRequestContext<'_>,
            _circ: &crate::tunnel::reactor::syncview::ClientCircSyncView<'_>,
        ) -> Result<crate::stream::IncomingStreamRequestDisposition> {
            Ok(crate::stream::IncomingStreamRequestDisposition::Accept)
        }
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "hs-service")]
    fn allow_stream_requests_twice() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let (chan, _rx, _sink) = working_fake_channel(&rt);
            let (circ, _send) = newcirc(&rt, chan).await;
            let _incoming = circ
                .allow_stream_requests(
                    &[tor_cell::relaycell::RelayCmd::BEGIN],
                    circ.last_hop().unwrap(),
                    AllowAllStreamsFilter,
                )
                .await
                .unwrap();
            let incoming = circ
                .allow_stream_requests(
                    &[tor_cell::relaycell::RelayCmd::BEGIN],
                    circ.last_hop().unwrap(),
                    AllowAllStreamsFilter,
                )
                .await;
            // There can only be one IncomingStream at a time on any given circuit.
            assert!(incoming.is_err());
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "hs-service")]
    fn allow_stream_requests() {
        use tor_cell::relaycell::msg::BeginFlags;
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            const TEST_DATA: &[u8] = b"ping";
            let (chan, _rx, _sink) = working_fake_channel(&rt);
            let (circ, mut send) = newcirc(&rt, chan).await;
            let rfmt = RelayCellFormat::V0;
            // A helper channel for coordinating the "client"/"service" interaction
            let (tx, rx) = oneshot::channel();
            let mut incoming = circ
                .allow_stream_requests(
                    &[tor_cell::relaycell::RelayCmd::BEGIN],
                    circ.last_hop().unwrap(),
                    AllowAllStreamsFilter,
                )
                .await
                .unwrap();
            let simulate_service = async move {
                let stream = incoming.next().await.unwrap();
                let mut data_stream = stream
                    .accept_data(relaymsg::Connected::new_empty())
                    .await
                    .unwrap();
                // Notify the client task we're ready to accept DATA cells
                tx.send(()).unwrap();
                // Read the data the client sent us
                let mut buf = [0_u8; TEST_DATA.len()];
                data_stream.read_exact(&mut buf).await.unwrap();
                assert_eq!(&buf, TEST_DATA);
                circ
            };
            let simulate_client = async move {
                let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
                let body: BoxedCellBody =
                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
                        .encode(rfmt, &mut testing_rng())
                        .unwrap();
                let begin_msg = chanmsg::Relay::from(body);
                // Pretend to be a client at the other end of the circuit sending a begin cell
                send.send(ClientCircChanMsg::Relay(begin_msg))
                    .await
                    .unwrap();
                // Wait until the service is ready to accept data
                // TODO: we shouldn't need to wait! This is needed because the service will reject
                // any DATA cells that aren't associated with a known stream. We need to wait until
                // the service receives our BEGIN cell (and the reactor updates hop.map with the
                // new stream).
                rx.await.unwrap();
                // Now send some data along the newly established circuit..
                let data = relaymsg::Data::new(TEST_DATA).unwrap();
                let body: BoxedCellBody =
                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
                        .encode(rfmt, &mut testing_rng())
                        .unwrap();
                let data_msg = chanmsg::Relay::from(body);
                send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
                send
            };
            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "hs-service")]
    fn accept_stream_after_reject() {
        use tor_cell::relaycell::msg::BeginFlags;
        use tor_cell::relaycell::msg::EndReason;
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            const TEST_DATA: &[u8] = b"ping";
            const STREAM_COUNT: usize = 2;
            let rfmt = RelayCellFormat::V0;
            let (chan, _rx, _sink) = working_fake_channel(&rt);
            let (circ, mut send) = newcirc(&rt, chan).await;
            // A helper channel for coordinating the "client"/"service" interaction
            let (mut tx, mut rx) = mpsc::channel(STREAM_COUNT);
            let mut incoming = circ
                .allow_stream_requests(
                    &[tor_cell::relaycell::RelayCmd::BEGIN],
                    circ.last_hop().unwrap(),
                    AllowAllStreamsFilter,
                )
                .await
                .unwrap();
            let simulate_service = async move {
                // Process 2 incoming streams
                for i in 0..STREAM_COUNT {
                    let stream = incoming.next().await.unwrap();
                    // Reject the first one
                    if i == 0 {
                        stream
                            .reject(relaymsg::End::new_with_reason(EndReason::INTERNAL))
                            .await
                            .unwrap();
                        // Notify the client
                        tx.send(()).await.unwrap();
                        continue;
                    }
                    let mut data_stream = stream
                        .accept_data(relaymsg::Connected::new_empty())
                        .await
                        .unwrap();
                    // Notify the client task we're ready to accept DATA cells
                    tx.send(()).await.unwrap();
                    // Read the data the client sent us
                    let mut buf = [0_u8; TEST_DATA.len()];
                    data_stream.read_exact(&mut buf).await.unwrap();
                    assert_eq!(&buf, TEST_DATA);
                }
                circ
            };
            let simulate_client = async move {
                let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
                let body: BoxedCellBody =
                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
                        .encode(rfmt, &mut testing_rng())
                        .unwrap();
                let begin_msg = chanmsg::Relay::from(body);
                // Pretend to be a client at the other end of the circuit sending 2 identical begin
                // cells (the first one will be rejected by the test service).
                for _ in 0..STREAM_COUNT {
                    send.send(ClientCircChanMsg::Relay(begin_msg.clone()))
                        .await
                        .unwrap();
                    // Wait until the service rejects our request
                    rx.next().await.unwrap();
                }
                // Now send some data along the newly established circuit..
                let data = relaymsg::Data::new(TEST_DATA).unwrap();
                let body: BoxedCellBody =
                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
                        .encode(rfmt, &mut testing_rng())
                        .unwrap();
                let data_msg = chanmsg::Relay::from(body);
                send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
                send
            };
            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "hs-service")]
    fn incoming_stream_bad_hop() {
        use tor_cell::relaycell::msg::BeginFlags;
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            /// Expect the originator of the BEGIN cell to be hop 1.
            const EXPECTED_HOP: u8 = 1;
            let rfmt = RelayCellFormat::V0;
            let (chan, _rx, _sink) = working_fake_channel(&rt);
            let (circ, mut send) = newcirc(&rt, chan).await;
            // Expect to receive incoming streams from hop EXPECTED_HOP
            let mut incoming = circ
                .allow_stream_requests(
                    &[tor_cell::relaycell::RelayCmd::BEGIN],
                    (circ.unique_id(), EXPECTED_HOP.into()).into(),
                    AllowAllStreamsFilter,
                )
                .await
                .unwrap();
            let simulate_service = async move {
                // The originator of the cell is actually the last hop on the circuit, not hop 1,
                // so we expect the reactor to shut down.
                assert!(incoming.next().await.is_none());
                circ
            };
            let simulate_client = async move {
                let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
                let body: BoxedCellBody =
                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
                        .encode(rfmt, &mut testing_rng())
                        .unwrap();
                let begin_msg = chanmsg::Relay::from(body);
                // Pretend to be a client at the other end of the circuit sending a begin cell
                send.send(ClientCircChanMsg::Relay(begin_msg))
                    .await
                    .unwrap();
                send
            };
            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn multipath_circ_validation() {
        use std::error::Error as _;
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let params = CircParameters::default();
            let invalid_tunnels = [
                setup_bad_conflux_tunnel(&rt).await,
                setup_conflux_tunnel(&rt, true, params).await,
            ];
            for tunnel in invalid_tunnels {
                let TestTunnelCtx {
                    tunnel: _tunnel,
                    circs: _circs,
                    conflux_link_rx,
                } = tunnel;
                let conflux_hs_err = conflux_link_rx.await.unwrap().unwrap_err();
                let err_src = conflux_hs_err.source().unwrap();
                // The two circuits don't end in the same hop (no join point),
                // so the reactor will refuse to link them
                assert!(err_src
                    .to_string()
                    .contains("one more more conflux circuits are invalid"));
            }
        });
    }
    // TODO: this structure could be reused for the other tests,
    // to address nickm's comment:
    // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3005#note_3202362
    #[derive(Debug)]
    #[allow(unused)]
    #[cfg(feature = "conflux")]
    struct TestCircuitCtx {
        chan_rx: Receiver<AnyChanCell>,
        chan_tx: Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
        circ_tx: CircuitRxSender,
        unique_id: UniqId,
    }
    #[derive(Debug)]
    #[cfg(feature = "conflux")]
    struct TestTunnelCtx {
        tunnel: Arc<ClientCirc>,
        circs: Vec<TestCircuitCtx>,
        conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
    }
    /// Wait for a LINK cell to arrive on the specified channel and return its payload.
    #[cfg(feature = "conflux")]
    async fn await_link_payload(rx: &mut Receiver<AnyChanCell>) -> ConfluxLink {
        // Wait for the LINK cell...
        let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
        let rmsg = match chmsg {
            AnyChanMsg::Relay(r) => {
                AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                    .unwrap()
            }
            other => panic!("{:?}", other),
        };
        let (streamid, rmsg) = rmsg.into_streamid_and_msg();
        let link = match rmsg {
            AnyRelayMsg::ConfluxLink(link) => link,
            _ => panic!("unexpected relay message {rmsg:?}"),
        };
        assert!(streamid.is_none());
        link
    }
    #[cfg(feature = "conflux")]
    async fn setup_conflux_tunnel(
        rt: &MockRuntime,
        same_hops: bool,
        params: CircParameters,
    ) -> TestTunnelCtx {
        let hops1 = hop_details(3, 0);
        let hops2 = if same_hops {
            hops1.clone()
        } else {
            hop_details(3, 10)
        };
        let (chan1, rx1, chan_sink1) = working_fake_channel(rt);
        let (circ1, sink1) = newcirc_ext(
            rt,
            UniqId::new(1, 3),
            chan1,
            hops1,
            2.into(),
            params.clone(),
        )
        .await;
        let (chan2, rx2, chan_sink2) = working_fake_channel(rt);
        let (circ2, sink2) =
            newcirc_ext(rt, UniqId::new(2, 4), chan2, hops2, 2.into(), params).await;
        let (answer_tx, answer_rx) = oneshot::channel();
        circ2
            .command
            .unbounded_send(CtrlCmd::ShutdownAndReturnCircuit { answer: answer_tx })
            .unwrap();
        let circuit = answer_rx.await.unwrap().unwrap();
        // The circuit should be shutting down its reactor
        rt.advance_until_stalled().await;
        assert!(circ2.is_closing());
        let (conflux_link_tx, conflux_link_rx) = oneshot::channel();
        // Tell the first circuit to link with the second and form a multipath tunnel
        circ1
            .control
            .unbounded_send(CtrlMsg::LinkCircuits {
                circuits: vec![circuit],
                answer: conflux_link_tx,
            })
            .unwrap();
        let circ_ctx1 = TestCircuitCtx {
            chan_rx: rx1,
            chan_tx: chan_sink1,
            circ_tx: sink1,
            unique_id: circ1.unique_id(),
        };
        let circ_ctx2 = TestCircuitCtx {
            chan_rx: rx2,
            chan_tx: chan_sink2,
            circ_tx: sink2,
            unique_id: circ2.unique_id(),
        };
        TestTunnelCtx {
            tunnel: circ1,
            circs: vec![circ_ctx1, circ_ctx2],
            conflux_link_rx,
        }
    }
    #[cfg(feature = "conflux")]
    async fn setup_good_conflux_tunnel(rt: &MockRuntime) -> TestTunnelCtx {
        // Our 2 test circuits are identical, so they both have the same guards,
        // which technically violates the conflux set rule mentioned in prop354.
        // For testing purposes this is fine, but in production we'll need to ensure
        // the calling code prevents guard reuse (except in the case where
        // one of the guards happens to be Guard + Exit)
        let same_hops = true;
        let params = CircParameters::new(true, build_cc_vegas_params());
        setup_conflux_tunnel(rt, same_hops, params).await
    }
    #[cfg(feature = "conflux")]
    async fn setup_bad_conflux_tunnel(rt: &MockRuntime) -> TestTunnelCtx {
        // The two circuits don't share any hops,
        // so they won't end in the same hop (no join point),
        // causing the reactor to refuse to link them.
        let same_hops = false;
        let params = CircParameters::new(true, build_cc_vegas_params());
        setup_conflux_tunnel(rt, same_hops, params).await
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn reject_conflux_linked_before_hs() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let (chan, mut _rx, _sink) = working_fake_channel(&rt);
            let (circ, mut sink) = newcirc(&rt, chan).await;
            let nonce = V1Nonce::new(&mut testing_rng());
            let payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
            // Send a LINKED cell
            let linked = relaymsg::ConfluxLinked::new(payload).into();
            sink.send(rmsg_to_ccmsg(None, linked)).await.unwrap();
            rt.advance_until_stalled().await;
            assert!(circ.is_closing());
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn conflux_hs_timeout() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let TestTunnelCtx {
                tunnel: _tunnel,
                circs,
                conflux_link_rx,
            } = setup_good_conflux_tunnel(&rt).await;
            let [mut circ1, _circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
            // Wait for the LINK cell
            let link = await_link_payload(&mut circ1.chan_rx).await;
            // Send a LINK cell on the first leg...
            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
            circ1
                .circ_tx
                .send(rmsg_to_ccmsg(None, linked))
                .await
                .unwrap();
            // Do nothing, and wait for the handshake to timeout on the second leg
            rt.advance_by(Duration::from_secs(60)).await;
            let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
            // Get the handshake results of each circuit
            let [res1, res2]: [StdResult<(), ConfluxHandshakeError>; 2] =
                conflux_hs_res.try_into().unwrap();
            assert!(res1.is_ok());
            let err = res2.unwrap_err();
            assert!(matches!(err, ConfluxHandshakeError::Timeout), "{err:?}");
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn conflux_bad_hs() {
        use crate::util::err::ConfluxHandshakeError;
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let nonce = V1Nonce::new(&mut testing_rng());
            let bad_link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
            //let extended2 = relaymsg::Extended2::new(vec![]).into();
            let bad_hs_responses = [
                (
                    rmsg_to_ccmsg(
                        None,
                        relaymsg::ConfluxLinked::new(bad_link_payload.clone()).into(),
                    ),
                    "Received CONFLUX_LINKED cell with mismatched nonce",
                ),
                (
                    rmsg_to_ccmsg(None, relaymsg::ConfluxLink::new(bad_link_payload).into()),
                    "Unexpected CONFLUX_LINK cell from hop #3 on client circuit",
                ),
                (
                    rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into()),
                    "Received CONFLUX_SWITCH on unlinked circuit?!",
                ),
                // TODO: this currently causes the reactor to shut down immediately,
                // without sending a response on the handshake channel
                /*
                (
                    rmsg_to_ccmsg(None, extended2),
                    "Received CONFLUX_LINKED cell with mismatched nonce",
                ),
                */
            ];
            for (bad_cell, expected_err) in bad_hs_responses {
                let TestTunnelCtx {
                    tunnel,
                    circs,
                    conflux_link_rx,
                } = setup_good_conflux_tunnel(&rt).await;
                let [mut _circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
                // Respond with a bogus cell on one of the legs
                circ2.circ_tx.send(bad_cell).await.unwrap();
                let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
                // Get the handshake results (the handshake results are reported early,
                // without waiting for the second circuit leg's handshake to timeout,
                // because this is a protocol violation causing the entire tunnel to shut down)
                let [res2]: [StdResult<(), ConfluxHandshakeError>; 1] =
                    conflux_hs_res.try_into().unwrap();
                match res2.unwrap_err() {
                    ConfluxHandshakeError::Link(Error::CircProto(e)) => {
                        assert_eq!(e, expected_err);
                    }
                    e => panic!("unexpected error: {e:?}"),
                }
                assert!(tunnel.is_closing());
            }
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn unexpected_conflux_cell() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let nonce = V1Nonce::new(&mut testing_rng());
            let link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
            let bad_cells = [
                rmsg_to_ccmsg(
                    None,
                    relaymsg::ConfluxLinked::new(link_payload.clone()).into(),
                ),
                rmsg_to_ccmsg(
                    None,
                    relaymsg::ConfluxLink::new(link_payload.clone()).into(),
                ),
                rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into()),
            ];
            for bad_cell in bad_cells {
                let (chan, mut _rx, _sink) = working_fake_channel(&rt);
                let (circ, mut sink) = newcirc(&rt, chan).await;
                sink.send(bad_cell).await.unwrap();
                rt.advance_until_stalled().await;
                // Note: unfortunately we can't assert the circuit is
                // closing for the reason, because the reactor just logs
                // the error and then exits.
                assert!(circ.is_closing());
            }
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn conflux_bad_linked() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let TestTunnelCtx {
                tunnel,
                circs,
                conflux_link_rx: _,
            } = setup_good_conflux_tunnel(&rt).await;
            let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
            let link = await_link_payload(&mut circ1.chan_rx).await;
            // Send a LINK cell on the first leg...
            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
            circ1
                .circ_tx
                .send(rmsg_to_ccmsg(None, linked))
                .await
                .unwrap();
            // ...and two LINKED cells on the second
            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
            circ2
                .circ_tx
                .send(rmsg_to_ccmsg(None, linked))
                .await
                .unwrap();
            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
            circ2
                .circ_tx
                .send(rmsg_to_ccmsg(None, linked))
                .await
                .unwrap();
            rt.advance_until_stalled().await;
            // Receiving a LINKED cell on an already linked leg causes
            // the tunnel to be torn down
            assert!(tunnel.is_closing());
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn conflux_bad_switch() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let bad_switch = [
                // SWITCH cells with seqno = 0 are not allowed
                relaymsg::ConfluxSwitch::new(0),
                // TODO(#2031): from c-tor:
                //
                // We have to make sure that the switch command is truly
                // incrementing the sequence number, or else it becomes
                // a side channel that can be spammed for traffic analysis.
                //
                // We should figure out what this check is supposed to look like,
                // and have a test for it
            ];
            for bad_cell in bad_switch {
                let TestTunnelCtx {
                    tunnel,
                    circs,
                    conflux_link_rx,
                } = setup_good_conflux_tunnel(&rt).await;
                let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
                let link = await_link_payload(&mut circ1.chan_rx).await;
                // Send a LINKED cell on both legs
                for circ in [&mut circ1, &mut circ2] {
                    let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
                    circ.circ_tx
                        .send(rmsg_to_ccmsg(None, linked))
                        .await
                        .unwrap();
                }
                let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
                assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
                // Now send a bad SWITCH cell on *both* legs.
                // This will cause both legs to be removed from the conflux set,
                // which causes the tunnel reactor to shut down
                for circ in [&mut circ1, &mut circ2] {
                    let msg = rmsg_to_ccmsg(None, bad_cell.clone().into());
                    circ.circ_tx.send(msg).await.unwrap();
                }
                // The tunnel should be shutting down
                rt.advance_until_stalled().await;
                assert!(tunnel.is_closing());
            }
        });
    }
    // TODO(conflux): add a test for SWITCH handling
    /// Run a conflux test endpoint.
    #[cfg(feature = "conflux")]
    #[derive(Debug)]
    enum ConfluxTestEndpoint<I: Iterator<Item = Option<Duration>>> {
        /// Pretend to be an exit relay.
        Relay(ConfluxExitState<I>),
        /// Client task.
        Client {
            /// Channel for receiving the outcome of the conflux handshakes.
            conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
            /// The tunnel reactor handle
            tunnel: Arc<ClientCirc>,
            /// Data to send on a stream.
            send_data: Vec<u8>,
            /// Data we expect to receive on a stream.
            recv_data: Vec<u8>,
        },
    }
    /// Structure for returning the sinks, channels, etc. that must stay
    /// alive until the test is complete.
    #[allow(unused, clippy::large_enum_variant)]
    #[derive(Debug)]
    #[cfg(feature = "conflux")]
    enum ConfluxEndpointResult {
        Circuit {
            tunnel: Arc<ClientCirc>,
            stream: DataStream,
        },
        Relay {
            circ: TestCircuitCtx,
        },
    }
    /// Stream data, shared by all the mock exit endpoints.
    #[derive(Debug)]
    #[cfg(feature = "conflux")]
    struct ConfluxStreamState {
        /// The data received so far on this stream (at the exit).
        data_recvd: Vec<u8>,
        /// The total amount of data we expect to receive on this stream.
        expected_data_len: usize,
        /// Whether we have seen a BEGIN cell yet.
        begin_recvd: bool,
        /// Whether we have seen an END cell yet.
        end_recvd: bool,
        /// Whether we have sent an END cell yet.
        end_sent: bool,
    }
    #[cfg(feature = "conflux")]
    impl ConfluxStreamState {
        fn new(expected_data_len: usize) -> Self {
            Self {
                data_recvd: vec![],
                expected_data_len,
                begin_recvd: false,
                end_recvd: false,
                end_sent: false,
            }
        }
    }
    /// An object describing a SWITCH cell that we expect to receive
    /// in the mock exit
    #[derive(Debug)]
    #[cfg(feature = "conflux")]
    struct ExpectedSwitch {
        /// The number of cells we've seen on this leg so far,
        /// up to and including the SWITCH.
        cells_so_far: usize,
        /// The expected seqno in SWITCH cell,
        seqno: u32,
    }
    /// Object dispatching cells for delivery on the appropriate
    /// leg in a multipath tunnel.
    ///
    /// Used to send out-of-order cells from the mock exit
    /// to the client under test.
    #[cfg(feature = "conflux")]
    struct CellDispatcher {
        /// Channels on which to send the [`CellToSend`] commands on.
        leg_tx: HashMap<UniqId, mpsc::Sender<CellToSend>>,
        /// The list of cells to send,
        cells_to_send: Vec<(UniqId, AnyRelayMsg)>,
    }
    #[cfg(feature = "conflux")]
    impl CellDispatcher {
        async fn run(mut self) {
            while !self.cells_to_send.is_empty() {
                let (circ_id, cell) = self.cells_to_send.remove(0);
                let cell_tx = self.leg_tx.get_mut(&circ_id).unwrap();
                let (done_tx, done_rx) = oneshot::channel();
                cell_tx.send(CellToSend { done_tx, cell }).await.unwrap();
                // Wait for the cell to be sent before sending the next one.
                let () = done_rx.await.unwrap();
            }
        }
    }
    /// A cell for the mock exit to send on one of its legs.
    #[cfg(feature = "conflux")]
    #[derive(Debug)]
    struct CellToSend {
        /// Channel for notifying the control task that the cell was sent.
        done_tx: oneshot::Sender<()>,
        /// The cell to send.
        cell: AnyRelayMsg,
    }
    /// The state of a mock exit.
    #[derive(Debug)]
    #[cfg(feature = "conflux")]
    struct ConfluxExitState<I: Iterator<Item = Option<Duration>>> {
        /// The runtime, shared by the test client and mock exit tasks.
        ///
        /// The mutex prevents the client and mock exit tasks from calling
        /// functions like [`MockRuntime::advance_until_stalled`]
        /// or [`MockRuntime::progress_until_stalled]` concurrently,
        /// as this is not supported by the mock runtime.
        runtime: Arc<AsyncMutex<MockRuntime>>,
        /// The client view of the tunnel.
        tunnel: Arc<ClientCirc>,
        /// The circuit test context.
        circ: TestCircuitCtx,
        /// The RTT delay to introduce just before each SENDME.
        ///
        /// Used to trigger the client to send a SWITCH.
        rtt_delays: I,
        /// State of the (only) expected stream on this tunnel,
        /// shared by all the mock exit endpoints.
        stream_state: Arc<Mutex<ConfluxStreamState>>,
        /// The number of cells after which to expect a SWITCH
        /// cell from the client.
        expect_switch: Vec<ExpectedSwitch>,
        /// Channel for receiving notifications from the other leg.
        event_rx: mpsc::Receiver<MockExitEvent>,
        /// Channel for sending notifications to the other leg.
        event_tx: mpsc::Sender<MockExitEvent>,
        /// Whether this circuit leg should act as the primary (sending) leg.
        is_sending_leg: bool,
        /// A channel for receiving cells to send on this stream.
        cells_rx: mpsc::Receiver<CellToSend>,
    }
    #[cfg(feature = "conflux")]
    async fn good_exit_handshake(
        runtime: &Arc<AsyncMutex<MockRuntime>>,
        init_rtt_delay: Option<Duration>,
        rx: &mut Receiver<ChanCell<AnyChanMsg>>,
        sink: &mut CircuitRxSender,
    ) {
        // Wait for the LINK cell
        let link = await_link_payload(rx).await;
        // Introduce an artificial delay, to make one circ have a better initial RTT
        // than the other
        if let Some(init_rtt_delay) = init_rtt_delay {
            runtime.lock().await.advance_by(init_rtt_delay).await;
        }
        // Reply with a LINKED cell...
        let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
        sink.send(rmsg_to_ccmsg(None, linked)).await.unwrap();
        // Wait for the client to respond with LINKED_ACK...
        let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
        let rmsg = match chmsg {
            AnyChanMsg::Relay(r) => {
                AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                    .unwrap()
            }
            other => panic!("{other:?}"),
        };
        let (_streamid, rmsg) = rmsg.into_streamid_and_msg();
        assert!(matches!(rmsg, AnyRelayMsg::ConfluxLinkedAck(_)));
    }
    /// An event sent by one mock conflux leg to another.
    #[derive(Copy, Clone, Debug)]
    enum MockExitEvent {
        /// Inform the other leg we are done.
        Done,
        /// Inform the other leg a stream was opened.
        BeginRecvd(StreamId),
    }
    #[cfg(feature = "conflux")]
    async fn run_mock_conflux_exit<I: Iterator<Item = Option<Duration>>>(
        state: ConfluxExitState<I>,
    ) -> ConfluxEndpointResult {
        let ConfluxExitState {
            runtime,
            tunnel,
            mut circ,
            rtt_delays,
            stream_state,
            mut expect_switch,
            mut event_tx,
            mut event_rx,
            is_sending_leg,
            mut cells_rx,
        } = state;
        let mut rtt_delays = rtt_delays.into_iter();
        // Expect the client to open a stream, and de-multiplex the received stream data
        let stream_len = stream_state.lock().unwrap().expected_data_len;
        let mut data_cells_received = 0_usize;
        let mut cell_count = 0_usize;
        let mut tags = vec![];
        let mut streamid = None;
        let mut done_writing = false;
        loop {
            let should_exit = {
                let stream_state = stream_state.lock().unwrap();
                let done_reading = stream_state.data_recvd.len() >= stream_len;
                (stream_state.begin_recvd || stream_state.end_recvd) && done_reading && done_writing
            };
            if should_exit {
                break;
            }
            use futures::select;
            // Only start reading from the dispatcher channel after the stream is open
            // and we're ready to start sending cells.
            let mut next_cell = if streamid.is_some() && !done_writing {
                Box::pin(cells_rx.next().fuse())
                    as Pin<Box<dyn FusedFuture<Output = Option<CellToSend>> + Send>>
            } else {
                Box::pin(std::future::pending().fuse())
            };
            // Wait for the BEGIN cell to arrive, or for the transfer to complete
            // (we need to bail if the other leg already completed);
            let res = select! {
                res = circ.chan_rx.next() => {
                    res.unwrap()
                },
                res = event_rx.next() => {
                    let Some(event) = res else {
                        break;
                    };
                    match event {
                        MockExitEvent::Done => {
                            break;
                        },
                        MockExitEvent::BeginRecvd(id) => {
                            // The stream is now open (the other leg received the BEGIN),
                            // so we're reading to start reading cells from the cell dispatcher.
                            streamid = Some(id);
                            continue;
                        },
                    }
                }
                res = next_cell => {
                    if let Some(cell_to_send) = res {
                        let CellToSend { cell, done_tx } = cell_to_send;
                        // SWITCH cells don't have a stream ID
                        let streamid = if matches!(cell, AnyRelayMsg::ConfluxSwitch(_)) {
                            None
                        } else {
                            streamid
                        };
                        circ.circ_tx
                            .send(rmsg_to_ccmsg(streamid, cell))
                            .await
                            .unwrap();
                        runtime.lock().await.advance_until_stalled().await;
                        done_tx.send(()).unwrap();
                    } else {
                        done_writing = true;
                    }
                    continue;
                }
            };
            let (_id, chmsg) = res.into_circid_and_msg();
            cell_count += 1;
            let rmsg = match chmsg {
                AnyChanMsg::Relay(r) => {
                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                        .unwrap()
                }
                other => panic!("{:?}", other),
            };
            let (new_streamid, rmsg) = rmsg.into_streamid_and_msg();
            if streamid.is_none() {
                streamid = new_streamid;
            }
            let begin_recvd = stream_state.lock().unwrap().begin_recvd;
            let end_recvd = stream_state.lock().unwrap().end_recvd;
            match rmsg {
                AnyRelayMsg::Begin(_) if begin_recvd => {
                    panic!("client tried to open two streams?!");
                }
                AnyRelayMsg::Begin(_) if !begin_recvd => {
                    stream_state.lock().unwrap().begin_recvd = true;
                    // Reply with a connected cell...
                    let connected = relaymsg::Connected::new_empty().into();
                    circ.circ_tx
                        .send(rmsg_to_ccmsg(streamid, connected))
                        .await
                        .unwrap();
                    // Tell the other leg we received a BEGIN cell
                    event_tx
                        .send(MockExitEvent::BeginRecvd(streamid.unwrap()))
                        .await
                        .unwrap();
                }
                AnyRelayMsg::End(_) if !end_recvd => {
                    stream_state.lock().unwrap().end_recvd = true;
                    break;
                }
                AnyRelayMsg::End(_) if end_recvd => {
                    panic!("received two END cells for the same stream?!");
                }
                AnyRelayMsg::ConfluxSwitch(cell) => {
                    // Ensure we got the SWITCH after the expected number of cells
                    let expected = expect_switch.remove(0);
                    assert_eq!(expected.cells_so_far, cell_count);
                    assert_eq!(expected.seqno, cell.seqno());
                    // To keep the tests simple, we don't handle out of order cells,
                    // and simply sort the received data at the end.
                    // This ensures all the data was actually received,
                    // but it doesn't actually test that the SWITCH cells
                    // contain the appropriate seqnos.
                    continue;
                }
                AnyRelayMsg::Data(dat) => {
                    data_cells_received += 1;
                    stream_state
                        .lock()
                        .unwrap()
                        .data_recvd
                        .extend_from_slice(dat.as_ref());
                    let is_next_cell_sendme = data_cells_received % 31 == 0;
                    if is_next_cell_sendme {
                        if tags.is_empty() {
                            // Important: we need to make sure all the SENDMEs
                            // we sent so far have been processed by the reactor
                            // (otherwise the next QuerySendWindow call
                            // might return an outdated list of tags!)
                            runtime.lock().await.advance_until_stalled().await;
                            let (tx, rx) = oneshot::channel();
                            tunnel
                                .command
                                .unbounded_send(CtrlCmd::QuerySendWindow {
                                    hop: 2.into(),
                                    leg: circ.unique_id,
                                    done: tx,
                                })
                                .unwrap();
                            // Get a fresh batch of tags.
                            let (_window, new_tags) = rx.await.unwrap().unwrap();
                            tags = new_tags;
                        }
                        let tag = tags.remove(0);
                        // Introduce an artificial delay, to make one circ have worse RTT
                        // than the other, and thus trigger a SWITCH
                        if let Some(rtt_delay) = rtt_delays.next().flatten() {
                            runtime.lock().await.advance_by(rtt_delay).await;
                        }
                        // Make and send a circuit-level SENDME
                        let sendme = relaymsg::Sendme::from(tag).into();
                        circ.circ_tx
                            .send(rmsg_to_ccmsg(None, sendme))
                            .await
                            .unwrap();
                    }
                }
                _ => panic!("unexpected message {rmsg:?} on leg {}", circ.unique_id),
            }
        }
        let end_recvd = stream_state.lock().unwrap().end_recvd;
        // Close the stream if the other endpoint hasn't already done so
        if is_sending_leg && !end_recvd {
            let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
            circ.circ_tx
                .send(rmsg_to_ccmsg(streamid, end))
                .await
                .unwrap();
            stream_state.lock().unwrap().end_sent = true;
        }
        // This is allowed to fail, because the other leg might have exited first.
        let _ = event_tx.send(MockExitEvent::Done).await;
        // Ensure we received all the switch cells we were expecting
        assert!(
            expect_switch.is_empty(),
            "expect_switch = {expect_switch:?}"
        );
        ConfluxEndpointResult::Relay { circ }
    }
    #[cfg(feature = "conflux")]
    async fn run_conflux_client(
        tunnel: Arc<ClientCirc>,
        conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
        send_data: Vec<u8>,
        recv_data: Vec<u8>,
    ) -> ConfluxEndpointResult {
        let res = conflux_link_rx.await;
        let res = res.unwrap().unwrap();
        assert_eq!(res.len(), 2);
        // All circuit legs have completed the conflux handshake,
        // so we now have a multipath tunnel
        // Now we're ready to open a stream
        let mut stream = tunnel
            .begin_stream("www.example.com", 443, None)
            .await
            .unwrap();
        stream.write_all(&send_data).await.unwrap();
        stream.flush().await.unwrap();
        let mut recv: Vec<u8> = Vec::new();
        let recv_len = stream.read_to_end(&mut recv).await.unwrap();
        assert_eq!(recv_len, recv_data.len());
        assert_eq!(recv_data, recv);
        ConfluxEndpointResult::Circuit { tunnel, stream }
    }
    #[cfg(feature = "conflux")]
    async fn run_conflux_endpoint<I: Iterator<Item = Option<Duration>>>(
        endpoint: ConfluxTestEndpoint<I>,
    ) -> ConfluxEndpointResult {
        match endpoint {
            ConfluxTestEndpoint::Relay(state) => run_mock_conflux_exit(state).await,
            ConfluxTestEndpoint::Client {
                tunnel,
                conflux_link_rx,
                send_data,
                recv_data,
            } => run_conflux_client(tunnel, conflux_link_rx, send_data, recv_data).await,
        }
    }
    // In this test, a `ConfluxTestEndpoint::Client` task creates a multipath tunnel
    // with 2 legs, opens a stream and sends 300 DATA cells on it.
    //
    // The test spawns two `ConfluxTestEndpoint::Relay` tasks (one for each leg),
    // which mock the behavior of an exit. The two relay tasks introduce
    // artificial delays before each SENDME sent to the client,
    // in order to trigger it to switch its sending leg predictably.
    //
    // The mock exit does not send any data on the stream.
    //
    // This test checks that the client sends SWITCH cells at the right time,
    // and that all the data it sent over the stream arrived at the exit.
    //
    // Note, however, that it doesn't check that the client sends the data in
    // the right order. For simplicity, the test concatenates the data received
    // on both legs, sorts it, and then compares it against the of the data sent
    // by the client (TODO: improve this)
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn multipath_client_to_exit() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            /// The number of data cells to send.
            const NUM_CELLS: usize = 300;
            /// 498 bytes per DATA cell.
            const CELL_SIZE: usize = 498;
            let TestTunnelCtx {
                tunnel,
                circs,
                conflux_link_rx,
            } = setup_good_conflux_tunnel(&rt).await;
            let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
            // The stream data we're going to send over the conflux tunnel
            let mut send_data = (0..255_u8)
                .cycle()
                .take(NUM_CELLS * CELL_SIZE)
                .collect::<Vec<_>>();
            let stream_state = Arc::new(Mutex::new(ConfluxStreamState::new(send_data.len())));
            let mut tasks = vec![];
            // Channels used by the mock relays to notify each other
            // of various events.
            let (tx1, rx1) = mpsc::channel(1);
            let (tx2, rx2) = mpsc::channel(1);
            // The 9 RTT delays to insert before each of the 9 SENDMEs
            // the exit will end up sending.
            //
            // Note: the first delay is the init_rtt delay (measured during the conflux HS).
            let circ1_rtt_delays = [
                // Initially, circ1 has better RTT, so we will start on this leg.
                Some(Duration::from_millis(100)),
                // But then its RTT takes a turn for the worse,
                // triggering a switch after the first SENDME is processed
                // (this happens after sending 123 DATA cells).
                Some(Duration::from_millis(500)),
                Some(Duration::from_millis(700)),
                Some(Duration::from_millis(900)),
                Some(Duration::from_millis(1100)),
                Some(Duration::from_millis(1300)),
                Some(Duration::from_millis(1500)),
                Some(Duration::from_millis(1700)),
                Some(Duration::from_millis(1900)),
                Some(Duration::from_millis(2100)),
            ]
            .into_iter();
            let circ2_rtt_delays = [
                Some(Duration::from_millis(200)),
                Some(Duration::from_millis(400)),
                Some(Duration::from_millis(600)),
                Some(Duration::from_millis(800)),
                Some(Duration::from_millis(1000)),
                Some(Duration::from_millis(1200)),
                Some(Duration::from_millis(1400)),
                Some(Duration::from_millis(1600)),
                Some(Duration::from_millis(1800)),
                Some(Duration::from_millis(2000)),
            ]
            .into_iter();
            let expected_switches1 = vec![ExpectedSwitch {
                // We start on this leg, and receive a BEGIN cell,
                // followed by (4 * 31 - 1) = 123 DATA cells.
                // Then it becomes blocked on CC, then finally the reactor
                // realizes it has some SENDMEs to process, and
                // then as a result of the new RTT measurement, we switch to circ1,
                // and then finally we switch back here, and get another SWITCH
                // as the 126th cell.
                cells_so_far: 126,
                // Leg 2 switches back to this leg after the 249th cell
                // (just before sending the 250th one):
                // seqno = 125 carried over from leg 1 (see the seqno of the
                // SWITCH expected on leg 2 below), plus 1 SWITCH, plus
                // 4 * 31 = 124 DATA cells after which the RTT of the first leg
                // is deemed favorable again.
                //
                // 249 - 125 (last_seq_sent of leg 1) = 124
                seqno: 124,
            }];
            let expected_switches2 = vec![ExpectedSwitch {
                // The SWITCH is the first cell we received after the conflux HS
                // on this leg.
                cells_so_far: 1,
                // See explanation on the ExpectedSwitch from circ1 above.
                seqno: 125,
            }];
            let relay_runtime = Arc::new(AsyncMutex::new(rt.clone()));
            // Drop the senders and close the channels,
            // we have nothing to send in this test.
            let (_, cells_rx1) = mpsc::channel(1);
            let (_, cells_rx2) = mpsc::channel(1);
            let relay1 = ConfluxExitState {
                runtime: Arc::clone(&relay_runtime),
                tunnel: Arc::clone(&tunnel),
                circ: circ1,
                rtt_delays: circ1_rtt_delays,
                stream_state: Arc::clone(&stream_state),
                expect_switch: expected_switches1,
                event_tx: tx1,
                event_rx: rx2,
                is_sending_leg: true,
                cells_rx: cells_rx1,
            };
            let relay2 = ConfluxExitState {
                runtime: Arc::clone(&relay_runtime),
                tunnel: Arc::clone(&tunnel),
                circ: circ2,
                rtt_delays: circ2_rtt_delays,
                stream_state: Arc::clone(&stream_state),
                expect_switch: expected_switches2,
                event_tx: tx2,
                event_rx: rx1,
                is_sending_leg: false,
                cells_rx: cells_rx2,
            };
            for mut mock_relay in [relay1, relay2] {
                let leg = mock_relay.circ.unique_id;
                // Do the conflux handshake
                //
                // We do this outside of run_conflux_endpoint,
                // toa void running both handshakes at concurrently
                // (this gives more predictable RTT delays:
                // if both handshake tasks run at once, they race
                // to advance the mock runtime's clock)
                good_exit_handshake(
                    &relay_runtime,
                    mock_relay.rtt_delays.next().flatten(),
                    &mut mock_relay.circ.chan_rx,
                    &mut mock_relay.circ.circ_tx,
                )
                .await;
                let relay = ConfluxTestEndpoint::Relay(mock_relay);
                tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
            }
            tasks.push(rt.spawn_join(
                "client task".to_string(),
                run_conflux_endpoint(ConfluxTestEndpoint::Client {
                    tunnel,
                    conflux_link_rx,
                    send_data: send_data.clone(),
                    recv_data: vec![],
                }),
            ));
            let _sinks = futures::future::join_all(tasks).await;
            let mut stream_state = stream_state.lock().unwrap();
            assert!(stream_state.begin_recvd);
            stream_state.data_recvd.sort();
            send_data.sort();
            assert_eq!(stream_state.data_recvd, send_data);
        });
    }
    // In this test, a `ConfluxTestEndpoint::Client` task creates a multipath tunnel
    // with 2 legs, opens a stream and reads from the stream until the stream is closed.
    //
    // The test spawns two `ConfluxTestEndpoint::Relay` tasks (one for each leg),
    // which mock the behavior of an exit. The two tasks send DATA and SWITCH
    // cells on the two circuit "legs" such that some cells arrive out of order.
    // This forces the client to buffer some cells, and then reorder them when
    // the missing cells finally arrive.
    //
    // The client does not send any data on the stream.
    #[cfg(feature = "conflux")]
    async fn run_multipath_exit_to_client_test(
        rt: MockRuntime,
        tunnel: TestTunnelCtx,
        cells_to_send: Vec<(UniqId, AnyRelayMsg)>,
        send_data: Vec<u8>,
        recv_data: Vec<u8>,
    ) -> Arc<Mutex<ConfluxStreamState>> {
        let TestTunnelCtx {
            tunnel,
            circs,
            conflux_link_rx,
        } = tunnel;
        let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
        let stream_state = Arc::new(Mutex::new(ConfluxStreamState::new(send_data.len())));
        let mut tasks = vec![];
        let relay_runtime = Arc::new(AsyncMutex::new(rt.clone()));
        let (cells_tx1, cells_rx1) = mpsc::channel(1);
        let (cells_tx2, cells_rx2) = mpsc::channel(1);
        let dispatcher = CellDispatcher {
            leg_tx: [(circ1.unique_id, cells_tx1), (circ2.unique_id, cells_tx2)]
                .into_iter()
                .collect(),
            cells_to_send,
        };
        // Channels used by the mock relays to notify each other
        // of various events.
        let (tx1, rx1) = mpsc::channel(1);
        let (tx2, rx2) = mpsc::channel(1);
        let relay1 = ConfluxExitState {
            runtime: Arc::clone(&relay_runtime),
            tunnel: Arc::clone(&tunnel),
            circ: circ1,
            rtt_delays: [].into_iter(),
            stream_state: Arc::clone(&stream_state),
            // Expect no SWITCH cells from the client
            expect_switch: vec![],
            event_tx: tx1,
            event_rx: rx2,
            is_sending_leg: false,
            cells_rx: cells_rx1,
        };
        let relay2 = ConfluxExitState {
            runtime: Arc::clone(&relay_runtime),
            tunnel: Arc::clone(&tunnel),
            circ: circ2,
            rtt_delays: [].into_iter(),
            stream_state: Arc::clone(&stream_state),
            // Expect no SWITCH cells from the client
            expect_switch: vec![],
            event_tx: tx2,
            event_rx: rx1,
            is_sending_leg: true,
            cells_rx: cells_rx2,
        };
        // Run the cell dispatcher, which tells each exit leg task
        // what cells to write.
        //
        // This enables us to write out-of-order cells deterministically.
        rt.spawn(dispatcher.run()).unwrap();
        for mut mock_relay in [relay1, relay2] {
            let leg = mock_relay.circ.unique_id;
            good_exit_handshake(
                &relay_runtime,
                mock_relay.rtt_delays.next().flatten(),
                &mut mock_relay.circ.chan_rx,
                &mut mock_relay.circ.circ_tx,
            )
            .await;
            let relay = ConfluxTestEndpoint::Relay(mock_relay);
            tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
        }
        tasks.push(rt.spawn_join(
            "client task".to_string(),
            run_conflux_endpoint(ConfluxTestEndpoint::Client {
                tunnel,
                conflux_link_rx,
                send_data: send_data.clone(),
                recv_data,
            }),
        ));
        // Wait for all the tasks to complete
        let _sinks = futures::future::join_all(tasks).await;
        stream_state
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn multipath_exit_to_client() {
        // The data we expect the client to read from the stream
        const TO_SEND: &[u8] =
            b"But something about Buster Friendly irritated John Isidore, one specific thing";
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            // The indices of the tunnel legs.
            const CIRC1: usize = 0;
            const CIRC2: usize = 1;
            // The client receives the following cells, in the order indicated
            // by the t0-t8 "timestamps" (where C = CONNECTED, D = DATA, E = END,
            // S = SWITCH):
            //
            //  Leg 1 (CIRC1):   -----------D--------------------- D -- D -- C
            //                              |                      |    |    | \
            //                              |                      |    |    |  v
            //                              |                      |    |    | client
            //                              |                      |    |    |  ^
            //                              |                      |    |    |/
            //  Leg 2 (CIRC2): E - D -- D --\--- D* -- S (seqno=4)-/----/----/
            //                 |   |    |   |    |       |         |    |    |
            //                 |   |    |   |    |       |         |    |    |
            //                 |   |    |   |    |       |         |    |    |
            //  Time:          t8  t7   t6  t5   t4      t3        t2   t1  t0
            //
            //
            //  The cells marked with * are out of order.
            //
            // Note: t0 is the time when the client receives the first cell,
            // and t8 is the time when it receives the last one.
            // In other words, this test simulates a mock exit that "sent" the cells
            // in the order t0, t1, t2, t5, t4, t6, t7, t8
            let simple_switch = vec![
                (CIRC1, relaymsg::Data::new(&TO_SEND[0..5]).unwrap().into()),
                (CIRC1, relaymsg::Data::new(&TO_SEND[5..10]).unwrap().into()),
                // Switch to sending on the second leg
                (CIRC2, relaymsg::ConfluxSwitch::new(4).into()),
                // An out of order cell!
                (CIRC2, relaymsg::Data::new(&TO_SEND[20..30]).unwrap().into()),
                // The missing cell (as indicated by seqno = 4 from the switch cell above)
                // is finally arriving on leg1
                (CIRC1, relaymsg::Data::new(&TO_SEND[10..20]).unwrap().into()),
                (CIRC2, relaymsg::Data::new(&TO_SEND[30..40]).unwrap().into()),
                (CIRC2, relaymsg::Data::new(&TO_SEND[40..]).unwrap().into()),
            ];
            //  Leg 1 (CIRC1): ---------------- D  ------D* --- S(seqno = 3) -- D - D ---------------------------- C
            //                                  |        |          |           |   |                              | \
            //                                  |        |          |           |   |                              |  v
            //                                  |        |          |           |   |                              |  client
            //                                  |        |          |           |   |                              |  ^
            //                                  |        |          |           |   |                              | /
            //  Leg 2 (CIRC2): E - S(seqno = 2) \ -- D --\----------\---------- \ --\--- D* -- D* - S(seqno = 3) --/
            //                 |        |       |    |   |          |           |   |    |     |         |         |
            //                 |        |       |    |   |          |           |   |    |     |         |         |
            //                 |        |       |    |   |          |           |   |    |     |         |         |
            //  Time:          t11      t10     t9   t8  t7         t6          t5  t4   t3    t2        t1        t0
            //  =====================================================================================================
            //  Leg 1 LSR:      8        8      8 7  7   7          6           3   2    1      1        1         1
            //  Leg 2 LSR:      9        8      6 6  6   5          5           5   5    5      4        3         0
            //  LSD:            9        8      8 7  6   5          5       5   3   2    1      1        1         1
            //                                    ^ OOO cell is delivered   ^ the OOO cells are delivered to the stream
            //
            //
            //  (LSR = last seq received, LSD = last seq delivered, both from the client's POV)
            //
            //
            // The client keeps track of the `last_seqno_received` (LSR) on each leg.
            // This is incremented for each cell that counts towards the seqnos (BEGIN, DATA, etc.)
            // that is received on the leg. The client also tracks the `last_seqno_delivered` (LSD),
            // which is the seqno of the last cell delivered to a stream
            // (this is global for the whole tunnel, whereas the LSR is different for each leg).
            //
            // When switching to leg `N`, the seqno in the switch is, from the POV of the sender,
            // the delta between the absolute seqno (i.e. the total number of cells[^1] sent)
            // and the value of this absolute seqno when leg `N` was last used.
            //
            // At the time of the first SWITCH from `t1`, the exit "sent" 3 cells:
            // a `CONNECTED` cell, which was received by the client at `t0`, and 2 `DATA` cells that
            // haven't been received yet. At this point, the exit decides to switch to leg 2,
            // on which it hasn't sent any cells yet, so the seqno is set to `3 - 0 = 3`.
            //
            // At `t6` when the exit sends the second switch (leg 2 -> leg 1), has "sent" 6 cells
            // (`C` plus the data cells that are received at `t1 - 5` and `t8`.
            // The seqno is `6 - 3 = 3`, because when it last sent on leg 1,
            // the absolute seqno was `3`.
            //
            // At `t10`, the absolute seqno is 8 (8 qualifying cells have been sent so far).
            // When the exit last sent on leg 2 (which we are switching to),
            // the absolute seqno was `6`, so the `SWITCH` cell will have `8 - 6 = 2` as the seqno.
            //
            // [^1]: only counting the cells that count towards sequence numbers
            let multiple_switches = vec![
                // Immediately switch to sending on the second leg
                // (indicating that we've already sent 3 cells (including the CONNECTED)
                (CIRC2, relaymsg::ConfluxSwitch::new(3).into()),
                // Two out of order cells!
                (CIRC2, relaymsg::Data::new(&TO_SEND[15..20]).unwrap().into()),
                (CIRC2, relaymsg::Data::new(&TO_SEND[20..30]).unwrap().into()),
                // The missing cells finally arrive on the first leg
                (CIRC1, relaymsg::Data::new(&TO_SEND[0..10]).unwrap().into()),
                (CIRC1, relaymsg::Data::new(&TO_SEND[10..15]).unwrap().into()),
                // Switch back to the first leg
                (CIRC1, relaymsg::ConfluxSwitch::new(3).into()),
                // OOO cell
                (CIRC1, relaymsg::Data::new(&TO_SEND[31..40]).unwrap().into()),
                // Missing cell is received
                (CIRC2, relaymsg::Data::new(&TO_SEND[30..31]).unwrap().into()),
                // The remaining cells are in-order
                (CIRC1, relaymsg::Data::new(&TO_SEND[40..]).unwrap().into()),
                // Switch right after we've sent all the data we had to send
                (CIRC2, relaymsg::ConfluxSwitch::new(2).into()),
            ];
            // TODO: give these tests the ability to control when END cells are sent
            // (currently we have ensure the is_sending_leg is set to true
            // on the leg that ends up sending the last data cell).
            //
            // TODO: test the edge cases
            let tests = [simple_switch, multiple_switches];
            for cells_to_send in tests {
                let tunnel = setup_good_conflux_tunnel(&rt).await;
                assert_eq!(tunnel.circs.len(), 2);
                let circ_ids = [tunnel.circs[0].unique_id, tunnel.circs[1].unique_id];
                let cells_to_send = cells_to_send
                    .into_iter()
                    .map(|(i, cell)| (circ_ids[i], cell))
                    .collect();
                // The client won't be sending any DATA cells on this stream
                let send_data = vec![];
                let stream_state = run_multipath_exit_to_client_test(
                    rt.clone(),
                    tunnel,
                    cells_to_send,
                    send_data.clone(),
                    TO_SEND.into(),
                )
                .await;
                let stream_state = stream_state.lock().unwrap();
                assert!(stream_state.begin_recvd);
                // We don't expect the client to have sent anything
                assert!(stream_state.data_recvd.is_empty());
            }
        });
    }
    #[traced_test]
    #[test]
    #[cfg(all(feature = "conflux", feature = "hs-service"))]
    fn conflux_incoming_stream() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            use std::error::Error as _;
            const EXPECTED_HOP: u8 = 1;
            let TestTunnelCtx {
                tunnel,
                circs,
                conflux_link_rx,
            } = setup_good_conflux_tunnel(&rt).await;
            let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
            let link = await_link_payload(&mut circ1.chan_rx).await;
            for circ in [&mut circ1, &mut circ2] {
                let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
                circ.circ_tx
                    .send(rmsg_to_ccmsg(None, linked))
                    .await
                    .unwrap();
            }
            let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
            assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
            // TODO(#2002): we don't currently support conflux for onion services
            let err = tunnel
                .allow_stream_requests(
                    &[tor_cell::relaycell::RelayCmd::BEGIN],
                    (tunnel.unique_id(), EXPECTED_HOP.into()).into(),
                    AllowAllStreamsFilter,
                )
                .await
                // IncomingStream doesn't impl Debug, so we need to map to a different type
                .map(|_| ())
                .unwrap_err();
            let err_src = err.source().unwrap();
            assert!(err_src
                .to_string()
                .contains("Cannot allow stream requests on tunnel with 2 legs"));
        });
    }
}