1
//! Multi-hop paths over the Tor network.
2
//!
3
//! Right now, we only implement "client circuits" -- also sometimes
4
//! called "origin circuits".  A client circuit is one that is
5
//! constructed by this Tor instance, and used in its own behalf to
6
//! send data over the Tor network.
7
//!
8
//! Each circuit has multiple hops over the Tor network: each hop
9
//! knows only the hop before and the hop after.  The client shares a
10
//! separate set of keys with each hop.
11
//!
12
//! To build a circuit, first create a [crate::channel::Channel], then
13
//! call its [crate::channel::Channel::new_circ] method.  This yields
14
//! a [PendingClientCirc] object that won't become live until you call
15
//! one of the methods
16
//! (typically [`PendingClientCirc::create_firsthop`])
17
//! that extends it to its first hop.  After you've
18
//! done that, you can call [`ClientCirc::extend`] on the circuit to
19
//! build it into a multi-hop circuit.  Finally, you can use
20
//! [ClientCirc::begin_stream] to get a Stream object that can be used
21
//! for anonymized data.
22
//!
23
//! # Implementation
24
//!
25
//! Each open circuit has a corresponding Reactor object that runs in
26
//! an asynchronous task, and manages incoming cells from the
27
//! circuit's upstream channel.  These cells are either RELAY cells or
28
//! DESTROY cells.  DESTROY cells are handled immediately.
29
//! RELAY cells are either for a particular stream, in which case they
30
//! get forwarded to a RawCellStream object, or for no particular stream,
31
//! in which case they are considered "meta" cells (like EXTENDED2)
32
//! that should only get accepted if something is waiting for them.
33
//!
34
//! # Limitations
35
//!
36
//! This is client-only.
37

            
38
pub(crate) mod celltypes;
39
pub(crate) mod halfcirc;
40

            
41
#[cfg(feature = "hs-common")]
42
pub mod handshake;
43
#[cfg(not(feature = "hs-common"))]
44
pub(crate) mod handshake;
45

            
46
pub(super) mod path;
47
pub(crate) mod unique_id;
48

            
49
use crate::channel::Channel;
50
use crate::congestion::params::CongestionControlParams;
51
use crate::crypto::cell::HopNum;
52
use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
53
use crate::memquota::{CircuitAccount, SpecificAccount as _};
54
use crate::stream::{
55
    AnyCmdChecker, DataCmdChecker, DataStream, ResolveCmdChecker, ResolveStream, StreamParameters,
56
    StreamReader,
57
};
58
use crate::tunnel::circuit::celltypes::*;
59
use crate::tunnel::reactor::CtrlCmd;
60
use crate::tunnel::reactor::{
61
    CircuitHandshake, CtrlMsg, Reactor, RECV_WINDOW_INIT, STREAM_READER_BUFFER,
62
};
63
use crate::tunnel::{LegId, StreamTarget, TargetHop};
64
use crate::util::skew::ClockSkew;
65
use crate::{Error, ResolveError, Result};
66
use educe::Educe;
67
use path::HopDetail;
68
use tor_cell::{
69
    chancell::CircId,
70
    relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal},
71
};
72

            
73
use tor_error::{bad_api_usage, internal, into_internal};
74
use tor_linkspec::{CircTarget, LinkSpecType, OwnedChanTarget, RelayIdType};
75
use tor_protover::named;
76

            
77
pub use crate::crypto::binding::CircuitBinding;
78
pub use crate::memquota::StreamAccount;
79
pub use crate::tunnel::circuit::unique_id::UniqId;
80

            
81
#[cfg(feature = "hs-service")]
82
use {
83
    crate::stream::{IncomingCmdChecker, IncomingStream},
84
    crate::tunnel::reactor::StreamReqInfo,
85
};
86

            
87
use futures::channel::mpsc;
88
use oneshot_fused_workaround as oneshot;
89

            
90
use crate::congestion::sendme::StreamRecvWindow;
91
use crate::DynTimeProvider;
92
use futures::FutureExt as _;
93
use std::collections::HashMap;
94
use std::net::IpAddr;
95
use std::sync::{Arc, Mutex};
96
use tor_memquota::mq_queue::{self, ChannelSpec as _, MpscSpec};
97

            
98
use crate::crypto::handshake::ntor::NtorPublicKey;
99

            
100
pub use path::{Path, PathEntry};
101

            
102
/// The size of the buffer for communication between `ClientCirc` and its reactor.
103
pub const CIRCUIT_BUFFER_SIZE: usize = 128;
104

            
105
#[cfg(feature = "send-control-msg")]
106
use {crate::tunnel::msghandler::UserMsgHandler, crate::tunnel::reactor::MetaCellHandler};
107

            
108
pub use crate::tunnel::reactor::syncview::ClientCircSyncView;
109
#[cfg(feature = "send-control-msg")]
110
#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
111
pub use {crate::tunnel::msghandler::MsgHandler, crate::tunnel::reactor::MetaCellDisposition};
112

            
113
/// MPSC queue relating to a stream (either inbound or outbound), sender
114
pub(crate) type StreamMpscSender<T> = mq_queue::Sender<T, MpscSpec>;
115
/// MPSC queue relating to a stream (either inbound or outbound), receiver
116
pub(crate) type StreamMpscReceiver<T> = mq_queue::Receiver<T, MpscSpec>;
117

            
118
/// MPSC queue for inbound data on its way from channel to circuit, sender
119
pub(crate) type CircuitRxSender = mq_queue::Sender<ClientCircChanMsg, MpscSpec>;
120
/// MPSC queue for inbound data on its way from channel to circuit, receiver
121
pub(crate) type CircuitRxReceiver = mq_queue::Receiver<ClientCircChanMsg, MpscSpec>;
122

            
123
#[derive(Debug)]
124
/// A circuit that we have constructed over the Tor network.
125
///
126
/// # Circuit life cycle
127
///
128
/// `ClientCirc`s are created in an initially unusable state using [`Channel::new_circ`],
129
/// which returns a [`PendingClientCirc`].  To get a real (one-hop) circuit from
130
/// one of these, you invoke one of its `create_firsthop` methods (typically
131
/// [`create_firsthop_fast()`](PendingClientCirc::create_firsthop_fast) or
132
/// [`create_firsthop()`](PendingClientCirc::create_firsthop)).
133
/// Then, to add more hops to the circuit, you can call
134
/// [`extend()`](ClientCirc::extend) on it.
135
///
136
/// For higher-level APIs, see the `tor-circmgr` crate: the ones here in
137
/// `tor-proto` are probably not what you need.
138
///
139
/// After a circuit is created, it will persist until it is closed in one of
140
/// five ways:
141
///    1. A remote error occurs.
142
///    2. Some hop on the circuit sends a `DESTROY` message to tear down the
143
///       circuit.
144
///    3. The circuit's channel is closed.
145
///    4. Someone calls [`ClientCirc::terminate`] on the circuit.
146
///    5. The last reference to the `ClientCirc` is dropped. (Note that every stream
147
///       on a `ClientCirc` keeps a reference to it, which will in turn keep the
148
///       circuit from closing until all those streams have gone away.)
149
///
150
/// Note that in cases 1-4 the [`ClientCirc`] object itself will still exist: it
151
/// will just be unusable for most purposes.  Most operations on it will fail
152
/// with an error.
153
//
154
// Effectively, this struct contains two Arcs: one for `path` and one for
155
// `control` (which surely has something Arc-like in it).  We cannot unify
156
// these by putting a single Arc around the whole struct, and passing
157
// an Arc strong reference to the `Reactor`, because then `control` would
158
// not be dropped when the last user of the circuit goes away.  We could
159
// make the reactor have a weak reference but weak references are more
160
// expensive to dereference.
161
//
162
// Because of the above, cloning this struct is always going to involve
163
// two atomic refcount changes/checks.  Wrapping it in another Arc would
164
// be overkill.
165
//
166
pub struct ClientCirc {
167
    /// Mutable state shared with the `Reactor`.
168
    mutable: Arc<TunnelMutableState>,
169
    /// A unique identifier for this circuit.
170
    unique_id: UniqId,
171
    /// Channel to send control messages to the reactor.
172
    pub(super) control: mpsc::UnboundedSender<CtrlMsg>,
173
    /// Channel to send commands to the reactor.
174
    pub(super) command: mpsc::UnboundedSender<CtrlCmd>,
175
    /// A future that resolves to Cancelled once the reactor is shut down,
176
    /// meaning that the circuit is closed.
177
    #[cfg_attr(not(feature = "experimental-api"), allow(dead_code))]
178
    reactor_closed_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
179
    /// For testing purposes: the CircId, for use in peek_circid().
180
    #[cfg(test)]
181
    circid: CircId,
182
    /// Memory quota account
183
    memquota: CircuitAccount,
184
    /// Time provider
185
    time_provider: DynTimeProvider,
186
}
187

            
188
/// The mutable state of a tunnel, shared between [`ClientCirc`] and [`Reactor`].
189
///
190
/// NOTE(gabi): this mutex-inside-a-mutex might look suspicious,
191
/// but it is currently the best option we have for sharing
192
/// the circuit state with `ClientCirc` (and soon, with `ClientTunnel`).
193
/// In practice, these mutexes won't be accessed very often
194
/// (they're accessed for writing when a circuit is extended,
195
/// and for reading by the various `ClientCirc` APIs),
196
/// so they shouldn't really impact performance.
197
///
198
/// Alternatively, the circuit state information could be shared
199
/// outside the reactor through a channel (passed to the reactor via a `CtrlCmd`),
200
/// but in #1840 @opara notes that involves making the `ClientCirc` accessors
201
/// (`ClientCirc::path`, `ClientCirc::binding_key`, etc.)
202
/// asynchronous, which will significantly complicate their callsites,
203
/// which would in turn need to be made async too.
204
///
205
/// We should revisit this decision at some point, and decide whether an async API
206
/// would be preferable.
207
#[derive(Debug, Default)]
208
pub(super) struct TunnelMutableState(Mutex<HashMap<UniqId, (LegId, Arc<MutableState>)>>);
209

            
210
impl TunnelMutableState {
211
    /// Add the [`MutableState`] of a circuit.
212
160
    pub(super) fn insert(&self, unique_id: UniqId, leg: LegId, mutable: Arc<MutableState>) {
213
160
        #[allow(unused)] // unused in non-debug builds
214
160
        let state = self
215
160
            .0
216
160
            .lock()
217
160
            .expect("lock poisoned")
218
160
            .insert(unique_id, (leg, mutable));
219
160

            
220
160
        debug_assert!(state.is_none());
221
160
    }
222

            
223
    /// Remove the [`MutableState`] of a circuit.
224
8
    pub(super) fn remove(&self, unique_id: UniqId) {
225
8
        #[allow(unused)] // unused in non-debug builds
226
8
        let state = self.0.lock().expect("lock poisoned").remove(&unique_id);
227
8

            
228
8
        debug_assert!(state.is_some());
229
8
    }
230

            
231
    /// Return a [`Path`] object describing all the hops in the specified circuit.
232
    ///
233
    /// See [`MutableState::path`].
234
32
    fn path_ref(&self, unique_id: UniqId) -> Result<Arc<Path>> {
235
32
        let lock = self.0.lock().expect("lock poisoned");
236
32
        let (_leg, mutable) = lock
237
32
            .get(&unique_id)
238
32
            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
239

            
240
32
        Ok(mutable.path())
241
32
    }
242

            
243
    /// Return a description of the first hop of this circuit.
244
    ///
245
    /// Returns an error if a circuit with the specified [`UniqId`] doesn't exist.
246
    /// Returns `Ok(None)` if the specified circuit doesn't have any hops.
247
    fn first_hop(&self, unique_id: UniqId) -> Result<Option<OwnedChanTarget>> {
248
        let lock = self.0.lock().expect("lock poisoned");
249
        let (_leg, mutable) = lock
250
            .get(&unique_id)
251
            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
252

            
253
        let first_hop = mutable.first_hop().map(|first_hop| match first_hop {
254
            path::HopDetail::Relay(r) => r,
255
            #[cfg(feature = "hs-common")]
256
            path::HopDetail::Virtual => {
257
                panic!("somehow made a circuit with a virtual first hop.")
258
            }
259
        });
260

            
261
        Ok(first_hop)
262
    }
263

            
264
    /// Return the [`HopNum`] of the last hop of the specified circuit.
265
    ///
266
    /// Returns an error if a circuit with the specified [`UniqId`] doesn't exist.
267
    ///
268
    /// See [`MutableState::last_hop_num`].
269
32
    fn last_hop_num(&self, unique_id: UniqId) -> Result<Option<HopNum>> {
270
32
        let lock = self.0.lock().expect("lock poisoned");
271
32
        let (_leg, mutable) = lock
272
32
            .get(&unique_id)
273
32
            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
274

            
275
32
        Ok(mutable.last_hop_num())
276
32
    }
277

            
278
    /// Return the number of hops in the specified circuit.
279
    ///
280
    /// See [`MutableState::n_hops`].
281
80
    fn n_hops(&self, unique_id: UniqId) -> Result<usize> {
282
80
        let lock = self.0.lock().expect("lock poisoned");
283
80
        let (_leg, mutable) = lock
284
80
            .get(&unique_id)
285
80
            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
286

            
287
80
        Ok(mutable.n_hops())
288
80
    }
289

            
290
    /// Return the cryptographic material used to prove knowledge of a shared
291
    /// secret with with `hop` on the circuit with the specified `unique_id`.
292
    fn binding_key(&self, unique_id: UniqId, hop: HopNum) -> Result<Option<CircuitBinding>> {
293
        let lock = self.0.lock().expect("lock poisoned");
294
        let (_leg, mutable) = lock
295
            .get(&unique_id)
296
            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
297

            
298
        Ok(mutable.binding_key(hop))
299
    }
300
}
301

            
302
/// The mutable state of a circuit.
303
32
#[derive(Educe, Default)]
304
#[educe(Debug)]
305
pub(super) struct MutableState(Mutex<CircuitState>);
306

            
307
impl MutableState {
308
    /// Add a hop to the path of this circuit.
309
408
    pub(super) fn add_hop(&self, peer_id: HopDetail, binding: Option<CircuitBinding>) {
310
408
        let mut mutable = self.0.lock().expect("poisoned lock");
311
408
        Arc::make_mut(&mut mutable.path).push_hop(peer_id);
312
408
        mutable.binding.push(binding);
313
408
    }
314

            
315
    /// Get a copy of the circuit's current [`path::Path`].
316
32
    pub(super) fn path(&self) -> Arc<path::Path> {
317
32
        let mutable = self.0.lock().expect("poisoned lock");
318
32
        Arc::clone(&mutable.path)
319
32
    }
320

            
321
    /// Return the cryptographic material used to prove knowledge of a shared
322
    /// secret with with `hop`.
323
    pub(super) fn binding_key(&self, hop: HopNum) -> Option<CircuitBinding> {
324
        let mutable = self.0.lock().expect("poisoned lock");
325

            
326
        mutable.binding.get::<usize>(hop.into()).cloned().flatten()
327
        // NOTE: I'm not thrilled to have to copy this information, but we use
328
        // it very rarely, so it's not _that_ bad IMO.
329
    }
330

            
331
    /// Return a description of the first hop of this circuit.
332
    fn first_hop(&self) -> Option<HopDetail> {
333
        let mutable = self.0.lock().expect("poisoned lock");
334
        mutable.path.first_hop()
335
    }
336

            
337
    /// Return the [`HopNum`] of the last hop of this circuit.
338
    ///
339
    /// NOTE: This function will return the [`HopNum`] of the hop
340
    /// that is _currently_ the last. If there is an extend operation in progress,
341
    /// the currently pending hop may or may not be counted, depending on whether
342
    /// the extend operation finishes before this call is done.
343
32
    fn last_hop_num(&self) -> Option<HopNum> {
344
32
        let mutable = self.0.lock().expect("poisoned lock");
345
32
        mutable.path.last_hop_num()
346
32
    }
347

            
348
    /// Return the number of hops in this circuit.
349
    ///
350
    /// NOTE: This function will currently return only the number of hops
351
    /// _currently_ in the circuit. If there is an extend operation in progress,
352
    /// the currently pending hop may or may not be counted, depending on whether
353
    /// the extend operation finishes before this call is done.
354
80
    fn n_hops(&self) -> usize {
355
80
        let mutable = self.0.lock().expect("poisoned lock");
356
80
        mutable.path.n_hops()
357
80
    }
358
}
359

            
360
/// The shared state of a circuit.
361
32
#[derive(Educe, Default)]
362
#[educe(Debug)]
363
pub(super) struct CircuitState {
364
    /// Information about this circuit's path.
365
    ///
366
    /// This is stored in an Arc so that we can cheaply give a copy of it to
367
    /// client code; when we need to add a hop (which is less frequent) we use
368
    /// [`Arc::make_mut()`].
369
    path: Arc<path::Path>,
370

            
371
    /// Circuit binding keys [q.v.][`CircuitBinding`] information for each hop
372
    /// in the circuit's path.
373
    ///
374
    /// NOTE: Right now, there is a `CircuitBinding` for every hop.  There's a
375
    /// fair chance that this will change in the future, and I don't want other
376
    /// code to assume that a `CircuitBinding` _must_ exist, so I'm making this
377
    /// an `Option`.
378
    #[educe(Debug(ignore))]
379
    binding: Vec<Option<CircuitBinding>>,
380
}
381

            
382
/// A ClientCirc that needs to send a create cell and receive a created* cell.
383
///
384
/// To use one of these, call `create_firsthop_fast()` or `create_firsthop()`
385
/// to negotiate the cryptographic handshake with the first hop.
386
pub struct PendingClientCirc {
387
    /// A oneshot receiver on which we'll receive a CREATED* cell,
388
    /// or a DESTROY cell.
389
    recvcreated: oneshot::Receiver<CreateResponse>,
390
    /// The ClientCirc object that we can expose on success.
391
    circ: Arc<ClientCirc>,
392
}
393

            
394
/// Description of the network's current rules for building circuits.
395
#[non_exhaustive]
396
#[derive(Clone, Debug)]
397
pub struct CircParameters {
398
    /// Whether we should include ed25519 identities when we send
399
    /// EXTEND2 cells.
400
    pub extend_by_ed25519_id: bool,
401
    /// Congestion control parameters for this circuit.
402
    pub ccontrol: CongestionControlParams,
403
}
404

            
405
#[cfg(test)]
406
impl std::default::Default for CircParameters {
407
442
    fn default() -> Self {
408
442
        Self {
409
442
            extend_by_ed25519_id: true,
410
442
            ccontrol: crate::congestion::test_utils::params::build_cc_fixed_params(),
411
442
        }
412
442
    }
413
}
414

            
415
impl CircParameters {
416
    /// Constructor
417
527
    pub fn new(extend_by_ed25519_id: bool, ccontrol: CongestionControlParams) -> Self {
418
527
        Self {
419
527
            extend_by_ed25519_id,
420
527
            ccontrol,
421
527
        }
422
527
    }
423
}
424

            
425
impl ClientCirc {
426
    /// Return a description of the first hop of this circuit.
427
    ///
428
    /// # Panics
429
    ///
430
    /// Panics if there is no first hop.  (This should be impossible outside of
431
    /// the tor-proto crate, but within the crate it's possible to have a
432
    /// circuit with no hops.)
433
    pub fn first_hop(&self) -> Result<OwnedChanTarget> {
434
        Ok(self
435
            .mutable
436
            .first_hop(self.unique_id)
437
            .map_err(|_| Error::CircuitClosed)?
438
            .expect("called first_hop on an un-constructed circuit"))
439
    }
440

            
441
    /// Return the [`HopNum`] of the last hop of this circuit.
442
    ///
443
    /// Returns an error if there is no last hop.  (This should be impossible outside of the
444
    /// tor-proto crate, but within the crate it's possible to have a circuit with no hops.)
445
    ///
446
    /// NOTE: This function will return the [`HopNum`] of the hop
447
    /// that is _currently_ the last. If there is an extend operation in progress,
448
    /// the currently pending hop may or may not be counted, depending on whether
449
    /// the extend operation finishes before this call is done.
450
32
    pub fn last_hop_num(&self) -> Result<HopNum> {
451
32
        Ok(self
452
32
            .mutable
453
32
            .last_hop_num(self.unique_id)?
454
32
            .ok_or_else(|| internal!("no last hop index"))?)
455
32
    }
456

            
457
    /// Return a [`Path`] object describing all the hops in this circuit.
458
    ///
459
    /// Note that this `Path` is not automatically updated if the circuit is
460
    /// extended.
461
32
    pub fn path_ref(&self) -> Result<Arc<Path>> {
462
32
        self.mutable
463
32
            .path_ref(self.unique_id)
464
32
            .map_err(|_| Error::CircuitClosed)
465
32
    }
466

            
467
    /// Get the clock skew claimed by the first hop of the circuit.
468
    ///
469
    /// See [`Channel::clock_skew()`].
470
    pub async fn first_hop_clock_skew(&self) -> Result<ClockSkew> {
471
        let (tx, rx) = oneshot::channel();
472

            
473
        self.control
474
            .unbounded_send(CtrlMsg::FirstHopClockSkew { answer: tx })
475
            .map_err(|_| Error::CircuitClosed)?;
476

            
477
        Ok(rx.await.map_err(|_| Error::CircuitClosed)??)
478
    }
479

            
480
    /// Return a reference to this circuit's memory quota account
481
56
    pub fn mq_account(&self) -> &CircuitAccount {
482
56
        &self.memquota
483
56
    }
484

            
485
    /// Return the cryptographic material used to prove knowledge of a shared
486
    /// secret with with `hop`.
487
    ///
488
    /// See [`CircuitBinding`] for more information on how this is used.
489
    ///
490
    /// Return None if we have no circuit binding information for the hop, or if
491
    /// the hop does not exist.
492
    pub fn binding_key(&self, hop: HopNum) -> Result<Option<CircuitBinding>> {
493
        self.mutable
494
            .binding_key(self.unique_id, hop)
495
            .map_err(|_| Error::CircuitClosed)
496
    }
497

            
498
    /// Start an ad-hoc protocol exchange to the specified hop on this circuit
499
    ///
500
    /// To use this:
501
    ///
502
    ///  0. Create an inter-task channel you'll use to receive
503
    ///     the outcome of your conversation,
504
    ///     and bundle it into a [`MsgHandler`].
505
    ///
506
    ///  1. Call `start_conversation`.
507
    ///     This will install a your handler, for incoming messages,
508
    ///     and send the outgoing message (if you provided one).
509
    ///     After that, each message on the circuit
510
    ///     that isn't handled by the core machinery
511
    ///     is passed to your provided `reply_handler`.
512
    ///
513
    ///  2. Possibly call `send_msg` on the [`Conversation`],
514
    ///     from the call site of `start_conversation`,
515
    ///     possibly multiple times, from time to time,
516
    ///     to send further desired messages to the peer.
517
    ///
518
    ///  3. In your [`MsgHandler`], process the incoming messages.
519
    ///     You may respond by
520
    ///     sending additional messages
521
    ///     When the protocol exchange is finished,
522
    ///     `MsgHandler::handle_msg` should return
523
    ///     [`ConversationFinished`](MetaCellDisposition::ConversationFinished).
524
    ///
525
    /// If you don't need the `Conversation` to send followup messages,
526
    /// you may simply drop it,
527
    /// and rely on the responses you get from your handler,
528
    /// on the channel from step 0 above.
529
    /// Your handler will remain installed and able to process incoming messages
530
    /// until it returns `ConversationFinished`.
531
    ///
532
    /// (If you don't want to accept any replies at all, it may be
533
    /// simpler to use [`ClientCirc::send_raw_msg`].)
534
    ///
535
    /// Note that it is quite possible to use this function to violate the tor
536
    /// protocol; most users of this API will not need to call it.  It is used
537
    /// to implement most of the onion service handshake.
538
    ///
539
    /// # Limitations
540
    ///
541
    /// Only one conversation may be active at any one time,
542
    /// for any one circuit.
543
    /// This generally means that this function should not be called
544
    /// on a circuit which might be shared with anyone else.
545
    ///
546
    /// Likewise, it is forbidden to try to extend the circuit,
547
    /// while the conversation is in progress.
548
    ///
549
    /// After the conversation has finished, the circuit may be extended.
550
    /// Or, `start_conversation` may be called again;
551
    /// but, in that case there will be a gap between the two conversations,
552
    /// during which no `MsgHandler` is installed,
553
    /// and unexpected incoming messages would close the circuit.
554
    ///
555
    /// If these restrictions are violated, the circuit will be closed with an error.
556
    ///
557
    /// ## Precise definition of the lifetime of a conversation
558
    ///
559
    /// A conversation is in progress from entry to `start_conversation`,
560
    /// until entry to the body of the [`MsgHandler::handle_msg`]
561
    /// call which returns [`ConversationFinished`](MetaCellDisposition::ConversationFinished).
562
    /// (*Entry* since `handle_msg` is synchronously embedded
563
    /// into the incoming message processing.)
564
    /// So you may start a new conversation as soon as you have the final response
565
    /// via your inter-task channel from (0) above.
566
    ///
567
    /// The lifetime relationship of the [`Conversation`],
568
    /// vs the handler returning `ConversationFinished`
569
    /// is not enforced by the type system.
570
    // Doing so without still leaving plenty of scope for runtime errors doesn't seem possible,
571
    // at least while allowing sending followup messages from outside the handler.
572
    //
573
    // TODO hs: it might be nice to avoid exposing tor-cell APIs in the
574
    //   tor-proto interface.
575
    #[cfg(feature = "send-control-msg")]
576
    pub async fn start_conversation(
577
        &self,
578
        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
579
        reply_handler: impl MsgHandler + Send + 'static,
580
        hop_num: HopNum,
581
    ) -> Result<Conversation<'_>> {
582
        let handler = Box::new(UserMsgHandler::new(hop_num, reply_handler));
583
        let conversation = Conversation(self);
584
        conversation.send_internal(msg, Some(handler)).await?;
585
        Ok(conversation)
586
    }
587

            
588
    /// Send an ad-hoc message to a given hop on the circuit, without expecting
589
    /// a reply.
590
    ///
591
    /// (If you want to handle one or more possible replies, see
592
    /// [`ClientCirc::start_conversation`].)
593
    #[cfg(feature = "send-control-msg")]
594
    pub async fn send_raw_msg(
595
        &self,
596
        msg: tor_cell::relaycell::msg::AnyRelayMsg,
597
        hop_num: HopNum,
598
    ) -> Result<()> {
599
        let (sender, receiver) = oneshot::channel();
600
        let ctrl_msg = CtrlMsg::SendMsg {
601
            hop_num,
602
            msg,
603
            sender,
604
        };
605
        self.control
606
            .unbounded_send(ctrl_msg)
607
            .map_err(|_| Error::CircuitClosed)?;
608

            
609
        receiver.await.map_err(|_| Error::CircuitClosed)?
610
    }
611

            
612
    /// Tell this circuit to begin allowing the final hop of the circuit to try
613
    /// to create new Tor streams, and to return those pending requests in an
614
    /// asynchronous stream.
615
    ///
616
    /// Ordinarily, these requests are rejected.
617
    ///
618
    /// There can only be one [`Stream`](futures::Stream) of this type created on a given circuit.
619
    /// If a such a [`Stream`](futures::Stream) already exists, this method will return
620
    /// an error.
621
    ///
622
    /// After this method has been called on a circuit, the circuit is expected
623
    /// to receive requests of this type indefinitely, until it is finally closed.
624
    /// If the `Stream` is dropped, the next request on this circuit will cause it to close.
625
    ///
626
    /// Only onion services (and eventually) exit relays should call this
627
    /// method.
628
    //
629
    // TODO: Someday, we might want to allow a stream request handler to be
630
    // un-registered.  However, nothing in the Tor protocol requires it.
631
    //
632
    // TODO(conflux): when conflux is ready, we need to update these docs to say
633
    // that on a multipath circuit, this function **must** be called on the "main"
634
    // (initial) circuit into which all of the other circuit legs are linked,
635
    // or on the resulting ClientTunnel itself.
636
    //
637
    // Any incoming request handlers installed on the other circuits
638
    // (which are are shutdown using CtrlCmd::ShutdownAndReturnCircuit)
639
    // will be discarded (along with the reactor of that circuit)
640
    #[cfg(feature = "hs-service")]
641
40
    pub async fn allow_stream_requests(
642
40
        self: &Arc<ClientCirc>,
643
40
        allow_commands: &[tor_cell::relaycell::RelayCmd],
644
40
        hop_num: HopNum,
645
40
        filter: impl crate::stream::IncomingStreamRequestFilter,
646
40
    ) -> Result<impl futures::Stream<Item = IncomingStream>> {
647
        use futures::stream::StreamExt;
648

            
649
        use crate::tunnel::HopLocation;
650

            
651
        /// The size of the channel receiving IncomingStreamRequestContexts.
652
        const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
653

            
654
40
        let time_prov = self.time_provider.clone();
655
40
        let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
656
40
        let (incoming_sender, incoming_receiver) =
657
40
            MpscSpec::new(INCOMING_BUFFER).new_mq(time_prov, self.memquota.as_raw_account())?;
658
40
        let (tx, rx) = oneshot::channel();
659
40

            
660
40
        self.command
661
40
            .unbounded_send(CtrlCmd::AwaitStreamRequest {
662
40
                cmd_checker,
663
40
                incoming_sender,
664
40
                hop_num,
665
40
                done: tx,
666
40
                filter: Box::new(filter),
667
40
            })
668
40
            .map_err(|_| Error::CircuitClosed)?;
669

            
670
        // Check whether the AwaitStreamRequest was processed successfully.
671
40
        rx.await.map_err(|_| Error::CircuitClosed)??;
672

            
673
        // TODO(conflux): maybe this function should take a HopLocation instead of a HopNum,
674
        // but we currently cannot resolve `HopLocation`s outside of the reactor
675
        // (and we need the resolved HopNum to assert the stream request indeed came from the right hop below).
676
32
        let allowed_hop_num = hop_num;
677
32

            
678
32
        let circ = Arc::clone(self);
679
32
        Ok(incoming_receiver.map(move |req_ctx| {
680
24
            let StreamReqInfo {
681
24
                req,
682
24
                stream_id,
683
24
                hop_num,
684
24
                leg,
685
24
                receiver,
686
24
                msg_tx,
687
24
                memquota,
688
24
                relay_cell_format,
689
24
            } = req_ctx;
690
24

            
691
24
            // We already enforce this in handle_incoming_stream_request; this
692
24
            // assertion is just here to make sure that we don't ever
693
24
            // accidentally remove or fail to enforce that check, since it is
694
24
            // security-critical.
695
24
            assert_eq!(allowed_hop_num, hop_num);
696

            
697
            // TODO(conflux): figure out what this is going to look like
698
            // for onion services (perhaps we should forbid this function
699
            // from being called on a multipath circuit?)
700
            //
701
            // See also:
702
            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3002#note_3200937
703
24
            let target = StreamTarget {
704
24
                circ: Arc::clone(&circ),
705
24
                tx: msg_tx,
706
24
                hop: HopLocation::Hop((leg, hop_num)),
707
24
                stream_id,
708
24
                relay_cell_format,
709
24
            };
710
24

            
711
24
            let reader = StreamReader {
712
24
                target: target.clone(),
713
24
                receiver,
714
24
                recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
715
24
                ended: false,
716
24
            };
717
24

            
718
24
            IncomingStream::new(req, target, reader, memquota)
719
32
        }))
720
40
    }
721

            
722
    /// Extend the circuit, via the most appropriate circuit extension handshake,
723
    /// to the chosen `target` hop.
724
    pub async fn extend<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
725
    where
726
        Tg: CircTarget,
727
    {
728
        // For now we use the simplest decision-making mechanism:
729
        // we use ntor_v3 whenever it is present; and otherwise we use ntor.
730
        //
731
        // This behavior is slightly different from C tor, which uses ntor v3
732
        // only whenever it want to send any extension in the circuit message.
733
        // But thanks to congestion control (named::FLOWCTRL_CC), we'll _always_
734
        // want to use an extension if we can, and so it doesn't make too much
735
        // sense to detect the case where we have no extensions.
736
        //
737
        // (As of April 2025, RELAY_NTORV3 is not yet listed as Required for relays
738
        // on the tor network, and so we cannot simply assume that everybody has it.)
739
        if target
740
            .protovers()
741
            .supports_named_subver(named::RELAY_NTORV3)
742
        {
743
            self.extend_ntor_v3(target, params).await
744
        } else {
745
            self.extend_ntor(target, params).await
746
        }
747
    }
748

            
749
    /// Extend the circuit via the ntor handshake to a new target last
750
    /// hop.
751
40
    pub async fn extend_ntor<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
752
40
    where
753
40
        Tg: CircTarget,
754
40
    {
755
40
        let key = NtorPublicKey {
756
40
            id: *target
757
40
                .rsa_identity()
758
40
                .ok_or(Error::MissingId(RelayIdType::Rsa))?,
759
40
            pk: *target.ntor_onion_key(),
760
        };
761
40
        let mut linkspecs = target
762
40
            .linkspecs()
763
40
            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
764
40
        if !params.extend_by_ed25519_id {
765
            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
766
40
        }
767

            
768
40
        let (tx, rx) = oneshot::channel();
769
40

            
770
40
        let peer_id = OwnedChanTarget::from_chan_target(target);
771
40
        self.control
772
40
            .unbounded_send(CtrlMsg::ExtendNtor {
773
40
                peer_id,
774
40
                public_key: key,
775
40
                linkspecs,
776
40
                params,
777
40
                done: tx,
778
40
            })
779
40
            .map_err(|_| Error::CircuitClosed)?;
780

            
781
40
        rx.await.map_err(|_| Error::CircuitClosed)??;
782

            
783
8
        Ok(())
784
40
    }
785

            
786
    /// Extend the circuit via the ntor handshake to a new target last
787
    /// hop.
788
8
    pub async fn extend_ntor_v3<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
789
8
    where
790
8
        Tg: CircTarget,
791
8
    {
792
8
        let key = NtorV3PublicKey {
793
8
            id: *target
794
8
                .ed_identity()
795
8
                .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
796
8
            pk: *target.ntor_onion_key(),
797
        };
798
8
        let mut linkspecs = target
799
8
            .linkspecs()
800
8
            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
801
8
        if !params.extend_by_ed25519_id {
802
            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
803
8
        }
804

            
805
8
        let (tx, rx) = oneshot::channel();
806
8

            
807
8
        let peer_id = OwnedChanTarget::from_chan_target(target);
808
8
        self.control
809
8
            .unbounded_send(CtrlMsg::ExtendNtorV3 {
810
8
                peer_id,
811
8
                public_key: key,
812
8
                linkspecs,
813
8
                params,
814
8
                done: tx,
815
8
            })
816
8
            .map_err(|_| Error::CircuitClosed)?;
817

            
818
8
        rx.await.map_err(|_| Error::CircuitClosed)??;
819

            
820
8
        Ok(())
821
8
    }
822

            
823
    /// Extend this circuit by a single, "virtual" hop.
824
    ///
825
    /// A virtual hop is one for which we do not add an actual network connection
826
    /// between separate hosts (such as Relays).  We only add a layer of
827
    /// cryptography.
828
    ///
829
    /// This is used to implement onion services: the client and the service
830
    /// both build a circuit to a single rendezvous point, and tell the
831
    /// rendezvous point to relay traffic between their two circuits.  Having
832
    /// completed a [`handshake`] out of band[^1], the parties each extend their
833
    /// circuits by a single "virtual" encryption hop that represents their
834
    /// shared cryptographic context.
835
    ///
836
    /// Once a circuit has been extended in this way, it is an error to try to
837
    /// extend it in any other way.
838
    ///
839
    /// [^1]: Technically, the handshake is only _mostly_ out of band: the
840
    ///     client sends their half of the handshake in an ` message, and the
841
    ///     service's response is inline in its `RENDEZVOUS2` message.
842
    //
843
    // TODO hs: let's try to enforce the "you can't extend a circuit again once
844
    // it has been extended this way" property.  We could do that with internal
845
    // state, or some kind of a type state pattern.
846
    //
847
    // TODO hs: possibly we should take a set of Protovers, and not just `Params`.
848
    #[cfg(feature = "hs-common")]
849
    pub async fn extend_virtual(
850
        &self,
851
        protocol: handshake::RelayProtocol,
852
        role: handshake::HandshakeRole,
853
        seed: impl handshake::KeyGenerator,
854
        params: CircParameters,
855
    ) -> Result<()> {
856
        use self::handshake::BoxedClientLayer;
857

            
858
        let protocol = handshake::RelayCryptLayerProtocol::from(protocol);
859
        let relay_cell_format = protocol.relay_cell_format();
860

            
861
        let BoxedClientLayer { fwd, back, binding } =
862
            protocol.construct_client_layers(role, seed)?;
863

            
864
        let (tx, rx) = oneshot::channel();
865
        let message = CtrlCmd::ExtendVirtual {
866
            relay_cell_format,
867
            cell_crypto: (fwd, back, binding),
868
            params,
869
            done: tx,
870
        };
871

            
872
        self.command
873
            .unbounded_send(message)
874
            .map_err(|_| Error::CircuitClosed)?;
875

            
876
        rx.await.map_err(|_| Error::CircuitClosed)?
877
    }
878

            
879
    /// Helper, used to begin a stream.
880
    ///
881
    /// This function allocates a stream ID, and sends the message
882
    /// (like a BEGIN or RESOLVE), but doesn't wait for a response.
883
    ///
884
    /// The caller will typically want to see the first cell in response,
885
    /// to see whether it is e.g. an END or a CONNECTED.
886
56
    async fn begin_stream_impl(
887
56
        self: &Arc<ClientCirc>,
888
56
        begin_msg: AnyRelayMsg,
889
56
        cmd_checker: AnyCmdChecker,
890
84
    ) -> Result<(StreamReader, StreamTarget, StreamAccount)> {
891
56
        // TODO: Possibly this should take a hop, rather than just
892
56
        // assuming it's the last hop.
893
56
        let hop = TargetHop::LastHop;
894
56

            
895
56
        let time_prov = self.time_provider.clone();
896

            
897
56
        let memquota = StreamAccount::new(self.mq_account())?;
898
56
        let (sender, receiver) = MpscSpec::new(STREAM_READER_BUFFER)
899
56
            .new_mq(time_prov.clone(), memquota.as_raw_account())?;
900
56
        let (tx, rx) = oneshot::channel();
901
56
        let (msg_tx, msg_rx) =
902
56
            MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(time_prov, memquota.as_raw_account())?;
903

            
904
56
        self.control
905
56
            .unbounded_send(CtrlMsg::BeginStream {
906
56
                hop,
907
56
                message: begin_msg,
908
56
                sender,
909
56
                rx: msg_rx,
910
56
                done: tx,
911
56
                cmd_checker,
912
56
            })
913
56
            .map_err(|_| Error::CircuitClosed)?;
914

            
915
56
        let (stream_id, hop, relay_cell_format) = rx.await.map_err(|_| Error::CircuitClosed)??;
916

            
917
56
        let target = StreamTarget {
918
56
            circ: self.clone(),
919
56
            tx: msg_tx,
920
56
            hop,
921
56
            stream_id,
922
56
            relay_cell_format,
923
56
        };
924
56

            
925
56
        let reader = StreamReader {
926
56
            target: target.clone(),
927
56
            receiver,
928
56
            recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
929
56
            ended: false,
930
56
        };
931
56

            
932
56
        Ok((reader, target, memquota))
933
56
    }
934

            
935
    /// Start a DataStream (anonymized connection) to the given
936
    /// address and port, using a BEGIN cell.
937
56
    async fn begin_data_stream(
938
56
        self: &Arc<ClientCirc>,
939
56
        msg: AnyRelayMsg,
940
56
        optimistic: bool,
941
84
    ) -> Result<DataStream> {
942
56
        let (reader, target, memquota) = self
943
56
            .begin_stream_impl(msg, DataCmdChecker::new_any())
944
56
            .await?;
945
56
        let mut stream = DataStream::new(reader, target, memquota);
946
56
        if !optimistic {
947
48
            stream.wait_for_connection().await?;
948
8
        }
949
56
        Ok(stream)
950
56
    }
951

            
952
    /// Start a stream to the given address and port, using a BEGIN
953
    /// cell.
954
    ///
955
    /// The use of a string for the address is intentional: you should let
956
    /// the remote Tor relay do the hostname lookup for you.
957
48
    pub async fn begin_stream(
958
48
        self: &Arc<ClientCirc>,
959
48
        target: &str,
960
48
        port: u16,
961
48
        parameters: Option<StreamParameters>,
962
72
    ) -> Result<DataStream> {
963
48
        let parameters = parameters.unwrap_or_default();
964
48
        let begin_flags = parameters.begin_flags();
965
48
        let optimistic = parameters.is_optimistic();
966
48
        let target = if parameters.suppressing_hostname() {
967
            ""
968
        } else {
969
48
            target
970
        };
971
48
        let beginmsg = Begin::new(target, port, begin_flags)
972
48
            .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
973
48
        self.begin_data_stream(beginmsg.into(), optimistic).await
974
48
    }
975

            
976
    /// Start a new stream to the last relay in the circuit, using
977
    /// a BEGIN_DIR cell.
978
12
    pub async fn begin_dir_stream(self: Arc<ClientCirc>) -> Result<DataStream> {
979
8
        // Note that we always open begindir connections optimistically.
980
8
        // Since they are local to a relay that we've already authenticated
981
8
        // with and built a circuit to, there should be no additional checks
982
8
        // we need to perform to see whether the BEGINDIR will succeed.
983
8
        self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
984
8
            .await
985
8
    }
986

            
987
    /// Perform a DNS lookup, using a RESOLVE cell with the last relay
988
    /// in this circuit.
989
    ///
990
    /// Note that this function does not check for timeouts; that's
991
    /// the caller's responsibility.
992
    pub async fn resolve(self: &Arc<ClientCirc>, hostname: &str) -> Result<Vec<IpAddr>> {
993
        let resolve_msg = Resolve::new(hostname);
994

            
995
        let resolved_msg = self.try_resolve(resolve_msg).await?;
996

            
997
        resolved_msg
998
            .into_answers()
999
            .into_iter()
            .filter_map(|(val, _)| match resolvedval_to_result(val) {
                Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
                Ok(_) => None,
                Err(e) => Some(Err(e)),
            })
            .collect()
    }
    /// Perform a reverse DNS lookup, by sending a RESOLVE cell with
    /// the last relay on this circuit.
    ///
    /// Note that this function does not check for timeouts; that's
    /// the caller's responsibility.
    pub async fn resolve_ptr(self: &Arc<ClientCirc>, addr: IpAddr) -> Result<Vec<String>> {
        let resolve_ptr_msg = Resolve::new_reverse(&addr);
        let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
        resolved_msg
            .into_answers()
            .into_iter()
            .filter_map(|(val, _)| match resolvedval_to_result(val) {
                Ok(ResolvedVal::Hostname(v)) => Some(
                    String::from_utf8(v)
                        .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
                ),
                Ok(_) => None,
                Err(e) => Some(Err(e)),
            })
            .collect()
    }
    /// Helper: Send the resolve message, and read resolved message from
    /// resolve stream.
    async fn try_resolve(self: &Arc<ClientCirc>, msg: Resolve) -> Result<Resolved> {
        let (reader, _target, memquota) = self
            .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
            .await?;
        let mut resolve_stream = ResolveStream::new(reader, memquota);
        resolve_stream.read_msg().await
    }
    /// Shut down this circuit, along with all streams that are using it.
    /// Happens asynchronously (i.e. the circuit won't necessarily be done shutting down
    /// immediately after this function returns!).
    ///
    /// Note that other references to this circuit may exist.  If they
    /// do, they will stop working after you call this function.
    ///
    /// It's not necessary to call this method if you're just done
    /// with a circuit: the circuit should close on its own once nothing
    /// is using it any more.
    pub fn terminate(&self) {
        let _ = self.command.unbounded_send(CtrlCmd::Shutdown);
    }
    /// Called when a circuit-level protocol error has occurred and the
    /// circuit needs to shut down.
    ///
    /// This is a separate function because we may eventually want to have
    /// it do more than just shut down.
    ///
    /// As with `terminate`, this function is asynchronous.
    pub(crate) fn protocol_error(&self) {
        self.terminate();
    }
    /// Return true if this circuit is closed and therefore unusable.
4
    pub fn is_closing(&self) -> bool {
4
        self.control.is_closed()
4
    }
    /// Return a process-unique identifier for this circuit.
    pub fn unique_id(&self) -> UniqId {
        self.unique_id
    }
    /// Return the number of hops in this circuit.
    ///
    /// NOTE: This function will currently return only the number of hops
    /// _currently_ in the circuit. If there is an extend operation in progress,
    /// the currently pending hop may or may not be counted, depending on whether
    /// the extend operation finishes before this call is done.
80
    pub fn n_hops(&self) -> Result<usize> {
80
        self.mutable
80
            .n_hops(self.unique_id)
80
            .map_err(|_| Error::CircuitClosed)
80
    }
    /// Return a future that will resolve once this circuit has closed.
    ///
    /// Note that this method does not itself cause the circuit to shut down.
    ///
    /// TODO: Perhaps this should return some kind of status indication instead
    /// of just ()
    #[cfg(feature = "experimental-api")]
    pub fn wait_for_close(&self) -> impl futures::Future<Output = ()> + Send + Sync + 'static {
        self.reactor_closed_rx.clone().map(|_| ())
    }
}
/// Handle to use during an ongoing protocol exchange with a circuit's last hop
///
/// This is obtained from [`ClientCirc::start_conversation`],
/// and used to send messages to the last hop relay.
//
// TODO(conflux): this should use ClientTunnel, and it should be moved into
// the tunnel module.
#[cfg(feature = "send-control-msg")]
#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
pub struct Conversation<'r>(&'r ClientCirc);
#[cfg(feature = "send-control-msg")]
#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
impl Conversation<'_> {
    /// Send a protocol message as part of an ad-hoc exchange
    ///
    /// Responses are handled by the `MsgHandler` set up
    /// when the `Conversation` was created.
    pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
        self.send_internal(Some(msg), None).await
    }
    /// Send a `SendMsgAndInstallHandler` to the reactor and wait for the outcome
    ///
    /// The guts of `start_conversation` and `Conversation::send_msg`
    pub(crate) async fn send_internal(
        &self,
        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
    ) -> Result<()> {
        let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
        let (sender, receiver) = oneshot::channel();
        let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
            msg,
            handler,
            sender,
        };
        self.0
            .control
            .unbounded_send(ctrl_msg)
            .map_err(|_| Error::CircuitClosed)?;
        receiver.await.map_err(|_| Error::CircuitClosed)?
    }
}
impl PendingClientCirc {
    /// Instantiate a new circuit object: used from Channel::new_circ().
    ///
    /// Does not send a CREATE* cell on its own.
    ///
    ///
160
    pub(crate) fn new(
160
        id: CircId,
160
        channel: Arc<Channel>,
160
        createdreceiver: oneshot::Receiver<CreateResponse>,
160
        input: CircuitRxReceiver,
160
        unique_id: UniqId,
160
        runtime: DynTimeProvider,
160
        memquota: CircuitAccount,
160
    ) -> (PendingClientCirc, crate::tunnel::reactor::Reactor) {
160
        let time_provider = channel.time_provider().clone();
160
        let (reactor, control_tx, command_tx, reactor_closed_rx, mutable) =
160
            Reactor::new(channel, id, unique_id, input, runtime, memquota.clone());
160

            
160
        let circuit = ClientCirc {
160
            mutable,
160
            unique_id,
160
            control: control_tx,
160
            command: command_tx,
160
            reactor_closed_rx: reactor_closed_rx.shared(),
160
            #[cfg(test)]
160
            circid: id,
160
            memquota,
160
            time_provider,
160
        };
160

            
160
        let pending = PendingClientCirc {
160
            recvcreated: createdreceiver,
160
            circ: Arc::new(circuit),
160
        };
160
        (pending, reactor)
160
    }
    /// Extract the process-unique identifier for this pending circuit.
    pub fn peek_unique_id(&self) -> UniqId {
        self.circ.unique_id
    }
    /// Use the (questionable!) CREATE_FAST handshake to connect to the
    /// first hop of this circuit.
    ///
    /// There's no authentication in CRATE_FAST,
    /// so we don't need to know whom we're connecting to: we're just
    /// connecting to whichever relay the channel is for.
12
    pub async fn create_firsthop_fast(self, params: CircParameters) -> Result<Arc<ClientCirc>> {
8
        let (tx, rx) = oneshot::channel();
8
        self.circ
8
            .control
8
            .unbounded_send(CtrlMsg::Create {
8
                recv_created: self.recvcreated,
8
                handshake: CircuitHandshake::CreateFast,
8
                params,
8
                done: tx,
8
            })
8
            .map_err(|_| Error::CircuitClosed)?;
8
        rx.await.map_err(|_| Error::CircuitClosed)??;
8
        Ok(self.circ)
8
    }
    /// Use the most appropriate handshake to connect to the first hop of this circuit.
    ///
    /// Note that the provided 'target' must match the channel's target,
    /// or the handshake will fail.
    pub async fn create_firsthop<Tg>(
        self,
        target: &Tg,
        params: CircParameters,
    ) -> Result<Arc<ClientCirc>>
    where
        Tg: tor_linkspec::CircTarget,
    {
        // (See note in ClientCirc::extend.)
        if target
            .protovers()
            .supports_named_subver(named::RELAY_NTORV3)
        {
            self.create_firsthop_ntor_v3(target, params).await
        } else {
            self.create_firsthop_ntor(target, params).await
        }
    }
    /// Use the ntor handshake to connect to the first hop of this circuit.
    ///
    /// Note that the provided 'target' must match the channel's target,
    /// or the handshake will fail.
8
    pub async fn create_firsthop_ntor<Tg>(
8
        self,
8
        target: &Tg,
8
        params: CircParameters,
8
    ) -> Result<Arc<ClientCirc>>
8
    where
8
        Tg: tor_linkspec::CircTarget,
8
    {
8
        let (tx, rx) = oneshot::channel();
8

            
8
        self.circ
8
            .control
8
            .unbounded_send(CtrlMsg::Create {
8
                recv_created: self.recvcreated,
8
                handshake: CircuitHandshake::Ntor {
8
                    public_key: NtorPublicKey {
8
                        id: *target
8
                            .rsa_identity()
8
                            .ok_or(Error::MissingId(RelayIdType::Rsa))?,
8
                        pk: *target.ntor_onion_key(),
8
                    },
8
                    ed_identity: *target
8
                        .ed_identity()
8
                        .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
                },
8
                params,
8
                done: tx,
8
            })
8
            .map_err(|_| Error::CircuitClosed)?;
8
        rx.await.map_err(|_| Error::CircuitClosed)??;
8
        Ok(self.circ)
8
    }
    /// Use the ntor_v3 handshake to connect to the first hop of this circuit.
    ///
    /// Assumes that the target supports ntor_v3. The caller should verify
    /// this before calling this function, e.g. by validating that the target
    /// has advertised ["Relay=4"](https://spec.torproject.org/tor-spec/subprotocol-versioning.html#relay).
    ///
    /// Note that the provided 'target' must match the channel's target,
    /// or the handshake will fail.
16
    pub async fn create_firsthop_ntor_v3<Tg>(
16
        self,
16
        target: &Tg,
16
        params: CircParameters,
16
    ) -> Result<Arc<ClientCirc>>
16
    where
16
        Tg: tor_linkspec::CircTarget,
16
    {
16
        let (tx, rx) = oneshot::channel();
16

            
16
        self.circ
16
            .control
16
            .unbounded_send(CtrlMsg::Create {
16
                recv_created: self.recvcreated,
16
                handshake: CircuitHandshake::NtorV3 {
16
                    public_key: NtorV3PublicKey {
16
                        id: *target
16
                            .ed_identity()
16
                            .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
16
                        pk: *target.ntor_onion_key(),
16
                    },
16
                },
16
                params,
16
                done: tx,
16
            })
16
            .map_err(|_| Error::CircuitClosed)?;
16
        rx.await.map_err(|_| Error::CircuitClosed)??;
16
        Ok(self.circ)
16
    }
}
/// Convert a [`ResolvedVal`] into a Result, based on whether or not
/// it represents an error.
fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
    match val {
        ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
        ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
        ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
        _ => Ok(val),
    }
}
#[cfg(test)]
pub(crate) mod test {
    // @@ begin test lint list maintained by maint/add_warning @@
    #![allow(clippy::bool_assert_comparison)]
    #![allow(clippy::clone_on_copy)]
    #![allow(clippy::dbg_macro)]
    #![allow(clippy::mixed_attributes_style)]
    #![allow(clippy::print_stderr)]
    #![allow(clippy::print_stdout)]
    #![allow(clippy::single_char_pattern)]
    #![allow(clippy::unwrap_used)]
    #![allow(clippy::unchecked_duration_subtraction)]
    #![allow(clippy::useless_vec)]
    #![allow(clippy::needless_pass_by_value)]
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
    use super::*;
    use crate::channel::OpenChanCellS2C;
    use crate::channel::{test::new_reactor, CodecError};
    use crate::congestion::test_utils::params::build_cc_vegas_params;
    use crate::crypto::cell::RelayCellBody;
    use crate::crypto::handshake::ntor_v3::NtorV3Server;
    #[cfg(feature = "hs-service")]
    use crate::stream::IncomingStreamRequestFilter;
    use chanmsg::{AnyChanMsg, Created2, CreatedFast};
    use futures::channel::mpsc::{Receiver, Sender};
    use futures::io::{AsyncReadExt, AsyncWriteExt};
    use futures::sink::SinkExt;
    use futures::stream::StreamExt;
    use futures::task::SpawnExt;
    use hex_literal::hex;
    use std::collections::{HashMap, VecDeque};
    use std::fmt::Debug;
    use std::time::Duration;
    use tor_basic_utils::test_rng::testing_rng;
    use tor_cell::chancell::{msg as chanmsg, AnyChanCell, BoxedCellBody, ChanCmd};
    use tor_cell::relaycell::extend::{self as extend_ext, CircRequestExt, CircResponseExt};
    use tor_cell::relaycell::msg::SendmeTag;
    use tor_cell::relaycell::{
        msg as relaymsg, AnyRelayMsgOuter, RelayCellFormat, RelayCmd, RelayMsg as _, StreamId,
    };
    use tor_linkspec::OwnedCircTarget;
    use tor_memquota::HasMemoryCost;
    use tor_rtcompat::Runtime;
    use tracing::trace;
    use tracing_test::traced_test;
    impl PendingClientCirc {
        /// Testing only: Extract the circuit ID for this pending circuit.
        pub(crate) fn peek_circid(&self) -> CircId {
            self.circ.circid
        }
    }
    impl ClientCirc {
        /// Testing only: Extract the circuit ID of this circuit.
        pub(crate) fn peek_circid(&self) -> CircId {
            self.circid
        }
    }
    fn rmsg_to_ccmsg(id: Option<StreamId>, msg: relaymsg::AnyRelayMsg) -> ClientCircChanMsg {
        // TODO #1947: test other formats.
        let rfmt = RelayCellFormat::V0;
        let body: BoxedCellBody = AnyRelayMsgOuter::new(id, msg)
            .encode(rfmt, &mut testing_rng())
            .unwrap();
        let chanmsg = chanmsg::Relay::from(body);
        ClientCircChanMsg::Relay(chanmsg)
    }
    // Example relay IDs and keys
    const EXAMPLE_SK: [u8; 32] =
        hex!("7789d92a89711a7e2874c61ea495452cfd48627b3ca2ea9546aafa5bf7b55803");
    const EXAMPLE_PK: [u8; 32] =
        hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253");
    const EXAMPLE_ED_ID: [u8; 32] = [6; 32];
    const EXAMPLE_RSA_ID: [u8; 20] = [10; 20];
    /// Make an MPSC queue, of the type we use in Channels, but a fake one for testing
    #[cfg(test)]
    pub(crate) fn fake_mpsc<T: HasMemoryCost + Debug + Send>(
        buffer: usize,
    ) -> (StreamMpscSender<T>, StreamMpscReceiver<T>) {
        crate::fake_mpsc(buffer)
    }
    /// return an example OwnedCircTarget that can get used for an ntor handshake.
    fn example_target() -> OwnedCircTarget {
        let mut builder = OwnedCircTarget::builder();
        builder
            .chan_target()
            .ed_identity(EXAMPLE_ED_ID.into())
            .rsa_identity(EXAMPLE_RSA_ID.into());
        builder
            .ntor_onion_key(EXAMPLE_PK.into())
            .protocols("FlowCtrl=1".parse().unwrap())
            .build()
            .unwrap()
    }
    fn example_ntor_key() -> crate::crypto::handshake::ntor::NtorSecretKey {
        crate::crypto::handshake::ntor::NtorSecretKey::new(
            EXAMPLE_SK.into(),
            EXAMPLE_PK.into(),
            EXAMPLE_RSA_ID.into(),
        )
    }
    fn example_ntor_v3_key() -> crate::crypto::handshake::ntor_v3::NtorV3SecretKey {
        crate::crypto::handshake::ntor_v3::NtorV3SecretKey::new(
            EXAMPLE_SK.into(),
            EXAMPLE_PK.into(),
            EXAMPLE_ED_ID.into(),
        )
    }
    fn working_fake_channel<R: Runtime>(
        rt: &R,
    ) -> (
        Arc<Channel>,
        Receiver<AnyChanCell>,
        Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
    ) {
        let (channel, chan_reactor, rx, tx) = new_reactor(rt.clone());
        rt.spawn(async {
            let _ignore = chan_reactor.run().await;
        })
        .unwrap();
        (channel, rx, tx)
    }
    /// Which handshake type to use.
    #[derive(Copy, Clone)]
    enum HandshakeType {
        Fast,
        Ntor,
        NtorV3,
    }
    async fn test_create<R: Runtime>(rt: &R, handshake_type: HandshakeType, with_cc: bool) {
        // We want to try progressing from a pending circuit to a circuit
        // via a crate_fast handshake.
        use crate::crypto::handshake::{fast::CreateFastServer, ntor::NtorServer, ServerHandshake};
        let (chan, mut rx, _sink) = working_fake_channel(rt);
        let circid = CircId::new(128).unwrap();
        let (created_send, created_recv) = oneshot::channel();
        let (_circmsg_send, circmsg_recv) = fake_mpsc(64);
        let unique_id = UniqId::new(23, 17);
        let (pending, reactor) = PendingClientCirc::new(
            circid,
            chan,
            created_recv,
            circmsg_recv,
            unique_id,
            DynTimeProvider::new(rt.clone()),
            CircuitAccount::new_noop(),
        );
        rt.spawn(async {
            let _ignore = reactor.run().await;
        })
        .unwrap();
        // Future to pretend to be a relay on the other end of the circuit.
        let simulate_relay_fut = async move {
            let mut rng = testing_rng();
            let create_cell = rx.next().await.unwrap();
            assert_eq!(create_cell.circid(), Some(circid));
            let reply = match handshake_type {
                HandshakeType::Fast => {
                    let cf = match create_cell.msg() {
                        AnyChanMsg::CreateFast(cf) => cf,
                        other => panic!("{:?}", other),
                    };
                    let (_, rep) = CreateFastServer::server(
                        &mut rng,
                        &mut |_: &()| Some(()),
                        &[()],
                        cf.handshake(),
                    )
                    .unwrap();
                    CreateResponse::CreatedFast(CreatedFast::new(rep))
                }
                HandshakeType::Ntor => {
                    let c2 = match create_cell.msg() {
                        AnyChanMsg::Create2(c2) => c2,
                        other => panic!("{:?}", other),
                    };
                    let (_, rep) = NtorServer::server(
                        &mut rng,
                        &mut |_: &()| Some(()),
                        &[example_ntor_key()],
                        c2.body(),
                    )
                    .unwrap();
                    CreateResponse::Created2(Created2::new(rep))
                }
                HandshakeType::NtorV3 => {
                    let c2 = match create_cell.msg() {
                        AnyChanMsg::Create2(c2) => c2,
                        other => panic!("{:?}", other),
                    };
                    let mut reply_fn = if with_cc {
                        |client_exts: &[CircRequestExt]| {
                            let _ = client_exts
                                .iter()
                                .find(|e| matches!(e, CircRequestExt::CcRequest(_)))
                                .expect("Client failed to request CC");
                            // This needs to be aligned to test_utils params
                            // value due to validation that needs it in range.
                            Some(vec![CircResponseExt::CcResponse(
                                extend_ext::CcResponse::new(31),
                            )])
                        }
                    } else {
                        |_: &_| Some(vec![])
                    };
                    let (_, rep) = NtorV3Server::server(
                        &mut rng,
                        &mut reply_fn,
                        &[example_ntor_v3_key()],
                        c2.body(),
                    )
                    .unwrap();
                    CreateResponse::Created2(Created2::new(rep))
                }
            };
            created_send.send(reply).unwrap();
        };
        // Future to pretend to be a client.
        let client_fut = async move {
            let target = example_target();
            let params = CircParameters::default();
            let ret = match handshake_type {
                HandshakeType::Fast => {
                    trace!("doing fast create");
                    pending.create_firsthop_fast(params).await
                }
                HandshakeType::Ntor => {
                    trace!("doing ntor create");
                    pending.create_firsthop_ntor(&target, params).await
                }
                HandshakeType::NtorV3 => {
                    let params = if with_cc {
                        // Setup CC vegas parameters.
                        CircParameters::new(true, build_cc_vegas_params())
                    } else {
                        params
                    };
                    trace!("doing ntor_v3 create");
                    pending.create_firsthop_ntor_v3(&target, params).await
                }
            };
            trace!("create done: result {:?}", ret);
            ret
        };
        let (circ, _) = futures::join!(client_fut, simulate_relay_fut);
        let _circ = circ.unwrap();
        // pfew!  We've build a circuit!  Let's make sure it has one hop.
        assert_eq!(_circ.n_hops().unwrap(), 1);
    }
    #[traced_test]
    #[test]
    fn test_create_fast() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_create(&rt, HandshakeType::Fast, false).await;
        });
    }
    #[traced_test]
    #[test]
    fn test_create_ntor() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_create(&rt, HandshakeType::Ntor, false).await;
        });
    }
    #[traced_test]
    #[test]
    fn test_create_ntor_v3() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_create(&rt, HandshakeType::NtorV3, false).await;
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "flowctl-cc")]
    fn test_create_ntor_v3_with_cc() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_create(&rt, HandshakeType::NtorV3, true).await;
        });
    }
    // An encryption layer that doesn't do any crypto.   Can be used
    // as inbound or outbound, but not both at once.
    pub(crate) struct DummyCrypto {
        counter_tag: [u8; 20],
        counter: u32,
        lasthop: bool,
    }
    impl DummyCrypto {
        fn next_tag(&mut self) -> SendmeTag {
            #![allow(clippy::identity_op)]
            self.counter_tag[0] = ((self.counter >> 0) & 255) as u8;
            self.counter_tag[1] = ((self.counter >> 8) & 255) as u8;
            self.counter_tag[2] = ((self.counter >> 16) & 255) as u8;
            self.counter_tag[3] = ((self.counter >> 24) & 255) as u8;
            self.counter += 1;
            self.counter_tag.into()
        }
    }
    impl crate::crypto::cell::OutboundClientLayer for DummyCrypto {
        fn originate_for(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
            self.next_tag()
        }
        fn encrypt_outbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
    }
    impl crate::crypto::cell::InboundClientLayer for DummyCrypto {
        fn decrypt_inbound(
            &mut self,
            _cmd: ChanCmd,
            _cell: &mut RelayCellBody,
        ) -> Option<SendmeTag> {
            if self.lasthop {
                Some(self.next_tag())
            } else {
                None
            }
        }
    }
    impl DummyCrypto {
        pub(crate) fn new(lasthop: bool) -> Self {
            DummyCrypto {
                counter_tag: [0; 20],
                counter: 0,
                lasthop,
            }
        }
    }
    // Helper: set up a 3-hop circuit with no encryption, where the
    // next inbound message seems to come from hop next_msg_from
    async fn newcirc_ext<R: Runtime>(
        rt: &R,
        chan: Arc<Channel>,
        next_msg_from: HopNum,
    ) -> (Arc<ClientCirc>, CircuitRxSender) {
        let circid = CircId::new(128).unwrap();
        let (_created_send, created_recv) = oneshot::channel();
        let (circmsg_send, circmsg_recv) = fake_mpsc(64);
        let unique_id = UniqId::new(23, 17);
        let (pending, reactor) = PendingClientCirc::new(
            circid,
            chan,
            created_recv,
            circmsg_recv,
            unique_id,
            DynTimeProvider::new(rt.clone()),
            CircuitAccount::new_noop(),
        );
        rt.spawn(async {
            let _ignore = reactor.run().await;
        })
        .unwrap();
        let PendingClientCirc {
            circ,
            recvcreated: _,
        } = pending;
        // TODO #1067: Support other formats
        let relay_cell_format = RelayCellFormat::V0;
        for idx in 0_u8..3 {
            let params = CircParameters::default();
            let (tx, rx) = oneshot::channel();
            circ.command
                .unbounded_send(CtrlCmd::AddFakeHop {
                    relay_cell_format,
                    fwd_lasthop: idx == 2,
                    rev_lasthop: idx == u8::from(next_msg_from),
                    params,
                    done: tx,
                })
                .unwrap();
            rx.await.unwrap().unwrap();
        }
        (circ, circmsg_send)
    }
    // Helper: set up a 3-hop circuit with no encryption, where the
    // next inbound message seems to come from hop next_msg_from
    async fn newcirc<R: Runtime>(rt: &R, chan: Arc<Channel>) -> (Arc<ClientCirc>, CircuitRxSender) {
        newcirc_ext(rt, chan, 2.into()).await
    }
    async fn test_extend<R: Runtime>(rt: &R, handshake_type: HandshakeType) {
        use crate::crypto::handshake::{ntor::NtorServer, ServerHandshake};
        let (chan, mut rx, _sink) = working_fake_channel(rt);
        let (circ, mut sink) = newcirc(rt, chan).await;
        let circid = circ.peek_circid();
        let params = CircParameters::default();
        let extend_fut = async move {
            let target = example_target();
            match handshake_type {
                HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
                HandshakeType::Ntor => circ.extend_ntor(&target, params).await.unwrap(),
                HandshakeType::NtorV3 => circ.extend_ntor_v3(&target, params).await.unwrap(),
            };
            circ // gotta keep the circ alive, or the reactor would exit.
        };
        let reply_fut = async move {
            // We've disabled encryption on this circuit, so we can just
            // read the extend2 cell.
            let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
            assert_eq!(id, Some(circid));
            let rmsg = match chmsg {
                AnyChanMsg::RelayEarly(r) => {
                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                        .unwrap()
                }
                other => panic!("{:?}", other),
            };
            let e2 = match rmsg.msg() {
                AnyRelayMsg::Extend2(e2) => e2,
                other => panic!("{:?}", other),
            };
            let mut rng = testing_rng();
            let reply = match handshake_type {
                HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
                HandshakeType::Ntor => {
                    let (_keygen, reply) = NtorServer::server(
                        &mut rng,
                        &mut |_: &()| Some(()),
                        &[example_ntor_key()],
                        e2.handshake(),
                    )
                    .unwrap();
                    reply
                }
                HandshakeType::NtorV3 => {
                    let (_keygen, reply) = NtorV3Server::server(
                        &mut rng,
                        &mut |_: &[CircRequestExt]| Some(vec![]),
                        &[example_ntor_v3_key()],
                        e2.handshake(),
                    )
                    .unwrap();
                    reply
                }
            };
            let extended2 = relaymsg::Extended2::new(reply).into();
            sink.send(rmsg_to_ccmsg(None, extended2)).await.unwrap();
            (sink, rx) // gotta keep the sink and receiver alive, or the reactor will exit.
        };
        let (circ, (_sink, _rx)) = futures::join!(extend_fut, reply_fut);
        // Did we really add another hop?
        assert_eq!(circ.n_hops().unwrap(), 4);
        // Do the path accessors report a reasonable outcome?
        {
            let path = circ.path_ref().unwrap();
            let path = path
                .all_hops()
                .filter_map(|hop| match hop {
                    path::HopDetail::Relay(r) => Some(r),
                    #[cfg(feature = "hs-common")]
                    path::HopDetail::Virtual => None,
                })
                .collect::<Vec<_>>();
            assert_eq!(path.len(), 4);
            use tor_linkspec::HasRelayIds;
            assert_eq!(path[3].ed_identity(), example_target().ed_identity());
            assert_ne!(path[0].ed_identity(), example_target().ed_identity());
        }
        {
            let path = circ.path_ref().unwrap();
            assert_eq!(path.n_hops(), 4);
            use tor_linkspec::HasRelayIds;
            assert_eq!(
                path.hops()[3].as_chan_target().unwrap().ed_identity(),
                example_target().ed_identity()
            );
            assert_ne!(
                path.hops()[0].as_chan_target().unwrap().ed_identity(),
                example_target().ed_identity()
            );
        }
    }
    #[traced_test]
    #[test]
    fn test_extend_ntor() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_extend(&rt, HandshakeType::Ntor).await;
        });
    }
    #[traced_test]
    #[test]
    fn test_extend_ntor_v3() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_extend(&rt, HandshakeType::NtorV3).await;
        });
    }
    async fn bad_extend_test_impl<R: Runtime>(
        rt: &R,
        reply_hop: HopNum,
        bad_reply: ClientCircChanMsg,
    ) -> Error {
        let (chan, _rx, _sink) = working_fake_channel(rt);
        let (circ, mut sink) = newcirc_ext(rt, chan, reply_hop).await;
        let params = CircParameters::default();
        let target = example_target();
        #[allow(clippy::clone_on_copy)]
        let rtc = rt.clone();
        let sink_handle = rt
            .spawn_with_handle(async move {
                rtc.sleep(Duration::from_millis(100)).await;
                sink.send(bad_reply).await.unwrap();
                sink
            })
            .unwrap();
        let outcome = circ.extend_ntor(&target, params).await;
        let _sink = sink_handle.await;
        assert_eq!(circ.n_hops().unwrap(), 3);
        assert!(outcome.is_err());
        outcome.unwrap_err()
    }
    #[traced_test]
    #[test]
    fn bad_extend_wronghop() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let extended2 = relaymsg::Extended2::new(vec![]).into();
            let cc = rmsg_to_ccmsg(None, extended2);
            let error = bad_extend_test_impl(&rt, 1.into(), cc).await;
            // This case shows up as a CircDestroy, since a message sent
            // from the wrong hop won't even be delivered to the extend
            // code's meta-handler.  Instead the unexpected message will cause
            // the circuit to get torn down.
            match error {
                Error::CircuitClosed => {}
                x => panic!("got other error: {}", x),
            }
        });
    }
    #[traced_test]
    #[test]
    fn bad_extend_wrongtype() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let extended = relaymsg::Extended::new(vec![7; 200]).into();
            let cc = rmsg_to_ccmsg(None, extended);
            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
            match error {
                Error::BytesErr {
                    err: tor_bytes::Error::InvalidMessage(_),
                    object: "extended2 message",
                } => {}
                other => panic!("{:?}", other),
            }
        });
    }
    #[traced_test]
    #[test]
    fn bad_extend_destroy() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let cc = ClientCircChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
            match error {
                Error::CircuitClosed => {}
                other => panic!("{:?}", other),
            }
        });
    }
    #[traced_test]
    #[test]
    fn bad_extend_crypto() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
            let cc = rmsg_to_ccmsg(None, extended2);
            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
            assert!(matches!(error, Error::BadCircHandshakeAuth));
        });
    }
    #[traced_test]
    #[test]
    fn begindir() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let (chan, mut rx, _sink) = working_fake_channel(&rt);
            let (circ, mut sink) = newcirc(&rt, chan).await;
            let circid = circ.peek_circid();
            let begin_and_send_fut = async move {
                // Here we'll say we've got a circuit, and we want to
                // make a simple BEGINDIR request with it.
                let mut stream = circ.begin_dir_stream().await.unwrap();
                stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
                stream.flush().await.unwrap();
                let mut buf = [0_u8; 1024];
                let n = stream.read(&mut buf).await.unwrap();
                assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
                let n = stream.read(&mut buf).await.unwrap();
                assert_eq!(n, 0);
                stream
            };
            let reply_fut = async move {
                // We've disabled encryption on this circuit, so we can just
                // read the begindir cell.
                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
                assert_eq!(id, Some(circid));
                let rmsg = match chmsg {
                    AnyChanMsg::Relay(r) => {
                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                            .unwrap()
                    }
                    other => panic!("{:?}", other),
                };
                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
                assert!(matches!(rmsg, AnyRelayMsg::BeginDir(_)));
                // Reply with a Connected cell to indicate success.
                let connected = relaymsg::Connected::new_empty().into();
                sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
                // Now read a DATA cell...
                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
                assert_eq!(id, Some(circid));
                let rmsg = match chmsg {
                    AnyChanMsg::Relay(r) => {
                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                            .unwrap()
                    }
                    other => panic!("{:?}", other),
                };
                let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
                assert_eq!(streamid_2, streamid);
                if let AnyRelayMsg::Data(d) = rmsg {
                    assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
                } else {
                    panic!();
                }
                // Write another data cell in reply!
                let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
                    .unwrap()
                    .into();
                sink.send(rmsg_to_ccmsg(streamid, data)).await.unwrap();
                // Send an END cell to say that the conversation is over.
                let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
                sink.send(rmsg_to_ccmsg(streamid, end)).await.unwrap();
                (rx, sink) // gotta keep these alive, or the reactor will exit.
            };
            let (_stream, (_rx, _sink)) = futures::join!(begin_and_send_fut, reply_fut);
        });
    }
    // Test: close a stream, either by dropping it or by calling AsyncWriteExt::close.
    fn close_stream_helper(by_drop: bool) {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let (chan, mut rx, _sink) = working_fake_channel(&rt);
            let (circ, mut sink) = newcirc(&rt, chan).await;
            let stream_fut = async move {
                let stream = circ
                    .begin_stream("www.example.com", 80, None)
                    .await
                    .unwrap();
                let (r, mut w) = stream.split();
                if by_drop {
                    // Drop the writer and the reader, which should close the stream.
                    drop(r);
                    drop(w);
                    (None, circ) // make sure to keep the circuit alive
                } else {
                    // Call close on the writer, while keeping the reader alive.
                    w.close().await.unwrap();
                    (Some(r), circ)
                }
            };
            let handler_fut = async {
                // Read the BEGIN message.
                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
                let rmsg = match msg {
                    AnyChanMsg::Relay(r) => {
                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                            .unwrap()
                    }
                    other => panic!("{:?}", other),
                };
                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
                assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
                // Reply with a CONNECTED.
                let connected =
                    relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
                sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
                // Expect an END.
                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
                let rmsg = match msg {
                    AnyChanMsg::Relay(r) => {
                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                            .unwrap()
                    }
                    other => panic!("{:?}", other),
                };
                let (_, rmsg) = rmsg.into_streamid_and_msg();
                assert_eq!(rmsg.cmd(), RelayCmd::END);
                (rx, sink) // keep these alive or the reactor will exit.
            };
            let ((_opt_reader, _circ), (_rx, _sink)) = futures::join!(stream_fut, handler_fut);
        });
    }
    #[traced_test]
    #[test]
    fn drop_stream() {
        close_stream_helper(true);
    }
    #[traced_test]
    #[test]
    fn close_stream() {
        close_stream_helper(false);
    }
    // Set up a circuit and stream that expects some incoming SENDMEs.
    async fn setup_incoming_sendme_case<R: Runtime>(
        rt: &R,
        n_to_send: usize,
    ) -> (
        Arc<ClientCirc>,
        DataStream,
        CircuitRxSender,
        Option<StreamId>,
        usize,
        Receiver<AnyChanCell>,
        Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
    ) {
        let (chan, mut rx, sink2) = working_fake_channel(rt);
        let (circ, mut sink) = newcirc(rt, chan).await;
        let circid = circ.peek_circid();
        let begin_and_send_fut = {
            let circ = circ.clone();
            async move {
                // Take our circuit and make a stream on it.
                let mut stream = circ
                    .begin_stream("www.example.com", 443, None)
                    .await
                    .unwrap();
                let junk = [0_u8; 1024];
                let mut remaining = n_to_send;
                while remaining > 0 {
                    let n = std::cmp::min(remaining, junk.len());
                    stream.write_all(&junk[..n]).await.unwrap();
                    remaining -= n;
                }
                stream.flush().await.unwrap();
                stream
            }
        };
        let receive_fut = async move {
            // Read the begin cell.
            let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
            let rmsg = match chmsg {
                AnyChanMsg::Relay(r) => {
                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                        .unwrap()
                }
                other => panic!("{:?}", other),
            };
            let (streamid, rmsg) = rmsg.into_streamid_and_msg();
            assert!(matches!(rmsg, AnyRelayMsg::Begin(_)));
            // Reply with a connected cell...
            let connected = relaymsg::Connected::new_empty().into();
            sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
            // Now read bytes from the stream until we have them all.
            let mut bytes_received = 0_usize;
            let mut cells_received = 0_usize;
            while bytes_received < n_to_send {
                // Read a data cell, and remember how much we got.
                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
                assert_eq!(id, Some(circid));
                let rmsg = match chmsg {
                    AnyChanMsg::Relay(r) => {
                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                            .unwrap()
                    }
                    other => panic!("{:?}", other),
                };
                let (streamid2, rmsg) = rmsg.into_streamid_and_msg();
                assert_eq!(streamid2, streamid);
                if let AnyRelayMsg::Data(dat) = rmsg {
                    cells_received += 1;
                    bytes_received += dat.as_ref().len();
                } else {
                    panic!();
                }
            }
            (sink, streamid, cells_received, rx)
        };
        let (stream, (sink, streamid, cells_received, rx)) =
            futures::join!(begin_and_send_fut, receive_fut);
        (circ, stream, sink, streamid, cells_received, rx, sink2)
    }
    #[traced_test]
    #[test]
    fn accept_valid_sendme() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let (circ, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
            assert_eq!(cells_received, 301);
            // Make sure that the circuit is indeed expecting the right sendmes
            {
                let (tx, rx) = oneshot::channel();
                circ.command
                    .unbounded_send(CtrlCmd::QuerySendWindow {
                        hop: 2.into(),
                        done: tx,
                    })
                    .unwrap();
                let (window, tags) = rx.await.unwrap().unwrap();
                assert_eq!(window, 1000 - 301);
                assert_eq!(tags.len(), 3);
                // 100
                assert_eq!(
                    tags[0],
                    SendmeTag::from(hex!("6400000000000000000000000000000000000000"))
                );
                // 200
                assert_eq!(
                    tags[1],
                    SendmeTag::from(hex!("c800000000000000000000000000000000000000"))
                );
                // 300
                assert_eq!(
                    tags[2],
                    SendmeTag::from(hex!("2c01000000000000000000000000000000000000"))
                );
            }
            let reply_with_sendme_fut = async move {
                // make and send a circuit-level sendme.
                let c_sendme =
                    relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000"))
                        .into();
                sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
                // Make and send a stream-level sendme.
                let s_sendme = relaymsg::Sendme::new_empty().into();
                sink.send(rmsg_to_ccmsg(streamid, s_sendme)).await.unwrap();
                sink
            };
            let _sink = reply_with_sendme_fut.await;
            rt.advance_until_stalled().await;
            // Now make sure that the circuit is still happy, and its
            // window is updated.
            {
                let (tx, rx) = oneshot::channel();
                circ.command
                    .unbounded_send(CtrlCmd::QuerySendWindow {
                        hop: 2.into(),
                        done: tx,
                    })
                    .unwrap();
                let (window, _tags) = rx.await.unwrap().unwrap();
                assert_eq!(window, 1000 - 201);
            }
        });
    }
    #[traced_test]
    #[test]
    fn invalid_circ_sendme() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            // Same setup as accept_valid_sendme() test above but try giving
            // a sendme with the wrong tag.
            let (circ, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
            let reply_with_sendme_fut = async move {
                // make and send a circuit-level sendme with a bad tag.
                let c_sendme =
                    relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF"))
                        .into();
                sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
                sink
            };
            let _sink = reply_with_sendme_fut.await;
            // Check whether the reactor dies as a result of receiving invalid data.
            rt.advance_until_stalled().await;
            assert!(circ.is_closing());
        });
    }
    #[traced_test]
    #[test]
    fn test_busy_stream_fairness() {
        // Number of streams to use.
        const N_STREAMS: usize = 3;
        // Number of cells (roughly) for each stream to send.
        const N_CELLS: usize = 20;
        // Number of bytes that *each* stream will send, and that we'll read
        // from the channel.
        const N_BYTES: usize = relaymsg::Data::MAXLEN_V0 * N_CELLS;
        // Ignoring cell granularity, with perfect fairness we'd expect
        // `N_BYTES/N_STREAMS` bytes from each stream.
        //
        // We currently allow for up to a full cell less than that.  This is
        // somewhat arbitrary and can be changed as needed, since we don't
        // provide any specific fairness guarantees.
        const MIN_EXPECTED_BYTES_PER_STREAM: usize =
            N_BYTES / N_STREAMS - relaymsg::Data::MAXLEN_V0;
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let (chan, mut rx, _sink) = working_fake_channel(&rt);
            let (circ, mut sink) = newcirc(&rt, chan).await;
            // Run clients in a single task, doing our own round-robin
            // scheduling of writes to the reactor. Conversely, if we were to
            // put each client in its own task, we would be at the the mercy of
            // how fairly the runtime schedules the client tasks, which is outside
            // the scope of this test.
            rt.spawn({
                // Clone the circuit to keep it alive after writers have
                // finished with it.
                let circ = circ.clone();
                async move {
                    let mut clients = VecDeque::new();
                    struct Client {
                        stream: DataStream,
                        to_write: &'static [u8],
                    }
                    for _ in 0..N_STREAMS {
                        clients.push_back(Client {
                            stream: circ
                                .begin_stream("www.example.com", 80, None)
                                .await
                                .unwrap(),
                            to_write: &[0_u8; N_BYTES][..],
                        });
                    }
                    while let Some(mut client) = clients.pop_front() {
                        if client.to_write.is_empty() {
                            // Client is done. Don't put back in queue.
                            continue;
                        }
                        let written = client.stream.write(client.to_write).await.unwrap();
                        client.to_write = &client.to_write[written..];
                        clients.push_back(client);
                    }
                }
            })
            .unwrap();
            let channel_handler_fut = async {
                let mut stream_bytes_received = HashMap::<StreamId, usize>::new();
                let mut total_bytes_received = 0;
                loop {
                    let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
                    let rmsg = match msg {
                        AnyChanMsg::Relay(r) => AnyRelayMsgOuter::decode_singleton(
                            RelayCellFormat::V0,
                            r.into_relay_body(),
                        )
                        .unwrap(),
                        other => panic!("Unexpected chanmsg: {other:?}"),
                    };
                    let (streamid, rmsg) = rmsg.into_streamid_and_msg();
                    match rmsg.cmd() {
                        RelayCmd::BEGIN => {
                            // Add an entry for this stream.
                            let prev = stream_bytes_received.insert(streamid.unwrap(), 0);
                            assert_eq!(prev, None);
                            // Reply with a CONNECTED.
                            let connected = relaymsg::Connected::new_with_addr(
                                "10.0.0.1".parse().unwrap(),
                                1234,
                            )
                            .into();
                            sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
                        }
                        RelayCmd::DATA => {
                            let data_msg = relaymsg::Data::try_from(rmsg).unwrap();
                            let nbytes = data_msg.as_ref().len();
                            total_bytes_received += nbytes;
                            let streamid = streamid.unwrap();
                            let stream_bytes = stream_bytes_received.get_mut(&streamid).unwrap();
                            *stream_bytes += nbytes;
                            if total_bytes_received >= N_BYTES {
                                break;
                            }
                        }
                        RelayCmd::END => {
                            // Stream is done. If fair scheduling is working as
                            // expected we *probably* shouldn't get here, but we
                            // can ignore it and save the failure until we
                            // actually have the final stats.
                            continue;
                        }
                        other => {
                            panic!("Unexpected command {other:?}");
                        }
                    }
                }
                // Return our stats, along with the `rx` and `sink` to keep the
                // reactor alive (since clients could still be writing).
                (total_bytes_received, stream_bytes_received, rx, sink)
            };
            let (total_bytes_received, stream_bytes_received, _rx, _sink) =
                channel_handler_fut.await;
            assert_eq!(stream_bytes_received.len(), N_STREAMS);
            for (sid, stream_bytes) in stream_bytes_received {
                assert!(
                    stream_bytes >= MIN_EXPECTED_BYTES_PER_STREAM,
                    "Only {stream_bytes} of {total_bytes_received} bytes received from {N_STREAMS} came from {sid:?}; expected at least {MIN_EXPECTED_BYTES_PER_STREAM}"
                );
            }
        });
    }
    #[test]
    fn basic_params() {
        use super::CircParameters;
        let mut p = CircParameters::default();
        assert!(p.extend_by_ed25519_id);
        p.extend_by_ed25519_id = false;
        assert!(!p.extend_by_ed25519_id);
    }
    #[cfg(feature = "hs-service")]
    struct AllowAllStreamsFilter;
    #[cfg(feature = "hs-service")]
    impl IncomingStreamRequestFilter for AllowAllStreamsFilter {
        fn disposition(
            &mut self,
            _ctx: &crate::stream::IncomingStreamRequestContext<'_>,
            _circ: &crate::tunnel::reactor::syncview::ClientCircSyncView<'_>,
        ) -> Result<crate::stream::IncomingStreamRequestDisposition> {
            Ok(crate::stream::IncomingStreamRequestDisposition::Accept)
        }
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "hs-service")]
    fn allow_stream_requests_twice() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let (chan, _rx, _sink) = working_fake_channel(&rt);
            let (circ, _send) = newcirc(&rt, chan).await;
            let _incoming = circ
                .allow_stream_requests(
                    &[tor_cell::relaycell::RelayCmd::BEGIN],
                    circ.last_hop_num().unwrap(),
                    AllowAllStreamsFilter,
                )
                .await
                .unwrap();
            let incoming = circ
                .allow_stream_requests(
                    &[tor_cell::relaycell::RelayCmd::BEGIN],
                    circ.last_hop_num().unwrap(),
                    AllowAllStreamsFilter,
                )
                .await;
            // There can only be one IncomingStream at a time on any given circuit.
            assert!(incoming.is_err());
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "hs-service")]
    fn allow_stream_requests() {
        use tor_cell::relaycell::msg::BeginFlags;
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            const TEST_DATA: &[u8] = b"ping";
            let (chan, _rx, _sink) = working_fake_channel(&rt);
            let (circ, mut send) = newcirc(&rt, chan).await;
            let rfmt = RelayCellFormat::V0;
            // A helper channel for coordinating the "client"/"service" interaction
            let (tx, rx) = oneshot::channel();
            let mut incoming = circ
                .allow_stream_requests(
                    &[tor_cell::relaycell::RelayCmd::BEGIN],
                    circ.last_hop_num().unwrap(),
                    AllowAllStreamsFilter,
                )
                .await
                .unwrap();
            let simulate_service = async move {
                let stream = incoming.next().await.unwrap();
                let mut data_stream = stream
                    .accept_data(relaymsg::Connected::new_empty())
                    .await
                    .unwrap();
                // Notify the client task we're ready to accept DATA cells
                tx.send(()).unwrap();
                // Read the data the client sent us
                let mut buf = [0_u8; TEST_DATA.len()];
                data_stream.read_exact(&mut buf).await.unwrap();
                assert_eq!(&buf, TEST_DATA);
                circ
            };
            let simulate_client = async move {
                let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
                let body: BoxedCellBody =
                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
                        .encode(rfmt, &mut testing_rng())
                        .unwrap();
                let begin_msg = chanmsg::Relay::from(body);
                // Pretend to be a client at the other end of the circuit sending a begin cell
                send.send(ClientCircChanMsg::Relay(begin_msg))
                    .await
                    .unwrap();
                // Wait until the service is ready to accept data
                // TODO: we shouldn't need to wait! This is needed because the service will reject
                // any DATA cells that aren't associated with a known stream. We need to wait until
                // the service receives our BEGIN cell (and the reactor updates hop.map with the
                // new stream).
                rx.await.unwrap();
                // Now send some data along the newly established circuit..
                let data = relaymsg::Data::new(TEST_DATA).unwrap();
                let body: BoxedCellBody =
                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
                        .encode(rfmt, &mut testing_rng())
                        .unwrap();
                let data_msg = chanmsg::Relay::from(body);
                send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
                send
            };
            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "hs-service")]
    fn accept_stream_after_reject() {
        use tor_cell::relaycell::msg::BeginFlags;
        use tor_cell::relaycell::msg::EndReason;
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            const TEST_DATA: &[u8] = b"ping";
            const STREAM_COUNT: usize = 2;
            let rfmt = RelayCellFormat::V0;
            let (chan, _rx, _sink) = working_fake_channel(&rt);
            let (circ, mut send) = newcirc(&rt, chan).await;
            // A helper channel for coordinating the "client"/"service" interaction
            let (mut tx, mut rx) = mpsc::channel(STREAM_COUNT);
            let mut incoming = circ
                .allow_stream_requests(
                    &[tor_cell::relaycell::RelayCmd::BEGIN],
                    circ.last_hop_num().unwrap(),
                    AllowAllStreamsFilter,
                )
                .await
                .unwrap();
            let simulate_service = async move {
                // Process 2 incoming streams
                for i in 0..STREAM_COUNT {
                    let stream = incoming.next().await.unwrap();
                    // Reject the first one
                    if i == 0 {
                        stream
                            .reject(relaymsg::End::new_with_reason(EndReason::INTERNAL))
                            .await
                            .unwrap();
                        // Notify the client
                        tx.send(()).await.unwrap();
                        continue;
                    }
                    let mut data_stream = stream
                        .accept_data(relaymsg::Connected::new_empty())
                        .await
                        .unwrap();
                    // Notify the client task we're ready to accept DATA cells
                    tx.send(()).await.unwrap();
                    // Read the data the client sent us
                    let mut buf = [0_u8; TEST_DATA.len()];
                    data_stream.read_exact(&mut buf).await.unwrap();
                    assert_eq!(&buf, TEST_DATA);
                }
                circ
            };
            let simulate_client = async move {
                let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
                let body: BoxedCellBody =
                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
                        .encode(rfmt, &mut testing_rng())
                        .unwrap();
                let begin_msg = chanmsg::Relay::from(body);
                // Pretend to be a client at the other end of the circuit sending 2 identical begin
                // cells (the first one will be rejected by the test service).
                for _ in 0..STREAM_COUNT {
                    send.send(ClientCircChanMsg::Relay(begin_msg.clone()))
                        .await
                        .unwrap();
                    // Wait until the service rejects our request
                    rx.next().await.unwrap();
                }
                // Now send some data along the newly established circuit..
                let data = relaymsg::Data::new(TEST_DATA).unwrap();
                let body: BoxedCellBody =
                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
                        .encode(rfmt, &mut testing_rng())
                        .unwrap();
                let data_msg = chanmsg::Relay::from(body);
                send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
                send
            };
            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "hs-service")]
    fn incoming_stream_bad_hop() {
        use tor_cell::relaycell::msg::BeginFlags;
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            /// Expect the originator of the BEGIN cell to be hop 1.
            const EXPECTED_HOP: u8 = 1;
            let rfmt = RelayCellFormat::V0;
            let (chan, _rx, _sink) = working_fake_channel(&rt);
            let (circ, mut send) = newcirc(&rt, chan).await;
            // Expect to receive incoming streams from hop EXPECTED_HOP
            let mut incoming = circ
                .allow_stream_requests(
                    &[tor_cell::relaycell::RelayCmd::BEGIN],
                    EXPECTED_HOP.into(),
                    AllowAllStreamsFilter,
                )
                .await
                .unwrap();
            let simulate_service = async move {
                // The originator of the cell is actually the last hop on the circuit, not hop 1,
                // so we expect the reactor to shut down.
                assert!(incoming.next().await.is_none());
                circ
            };
            let simulate_client = async move {
                let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
                let body: BoxedCellBody =
                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
                        .encode(rfmt, &mut testing_rng())
                        .unwrap();
                let begin_msg = chanmsg::Relay::from(body);
                // Pretend to be a client at the other end of the circuit sending a begin cell
                send.send(ClientCircChanMsg::Relay(begin_msg))
                    .await
                    .unwrap();
                send
            };
            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
        });
    }
}