1
//! Multi-hop paths over the Tor network.
2
//!
3
//! Right now, we only implement "client circuits" -- also sometimes
4
//! called "origin circuits".  A client circuit is one that is
5
//! constructed by this Tor instance, and used in its own behalf to
6
//! send data over the Tor network.
7
//!
8
//! Each circuit has multiple hops over the Tor network: each hop
9
//! knows only the hop before and the hop after.  The client shares a
10
//! separate set of keys with each hop.
11
//!
12
//! To build a circuit, first create a [crate::channel::Channel], then
13
//! call its [crate::channel::Channel::new_circ] method.  This yields
14
//! a [PendingClientCirc] object that won't become live until you call
15
//! one of the methods that extends it to its first hop.  After you've
16
//! done that, you can call [ClientCirc::extend_ntor] on the circuit to
17
//! build it into a multi-hop circuit.  Finally, you can use
18
//! [ClientCirc::begin_stream] to get a Stream object that can be used
19
//! for anonymized data.
20
//!
21
//! # Implementation
22
//!
23
//! Each open circuit has a corresponding Reactor object that runs in
24
//! an asynchronous task, and manages incoming cells from the
25
//! circuit's upstream channel.  These cells are either RELAY cells or
26
//! DESTROY cells.  DESTROY cells are handled immediately.
27
//! RELAY cells are either for a particular stream, in which case they
28
//! get forwarded to a RawCellStream object, or for no particular stream,
29
//! in which case they are considered "meta" cells (like EXTENDED2)
30
//! that should only get accepted if something is waiting for them.
31
//!
32
//! # Limitations
33
//!
34
//! This is client-only.
35
//!
36
//! There's one big mutex on the whole circuit: the reactor needs to hold
37
//! it to process a cell, and streams need to hold it to send.
38
//!
39
//! There is no flow-control or rate-limiting or fairness.
40

            
41
pub(crate) mod celltypes;
42
pub(crate) mod halfcirc;
43
mod halfstream;
44
#[cfg(feature = "hs-common")]
45
pub mod handshake;
46
#[cfg(feature = "send-control-msg")]
47
mod msghandler;
48
mod path;
49
pub(crate) mod reactor;
50
pub(crate) mod sendme;
51
mod streammap;
52
mod unique_id;
53

            
54
use crate::channel::Channel;
55
use crate::circuit::celltypes::*;
56
use crate::circuit::reactor::{
57
    CircuitHandshake, CtrlMsg, Reactor, RECV_WINDOW_INIT, STREAM_READER_BUFFER,
58
};
59
pub use crate::circuit::unique_id::UniqId;
60
pub use crate::crypto::binding::CircuitBinding;
61
use crate::crypto::cell::HopNum;
62
#[cfg(feature = "ntor_v3")]
63
use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
64
use crate::stream::{
65
    AnyCmdChecker, DataCmdChecker, DataStream, ResolveCmdChecker, ResolveStream, StreamParameters,
66
    StreamReader,
67
};
68
use crate::{Error, ResolveError, Result};
69
use educe::Educe;
70
use tor_cell::chancell::msg::HandshakeType;
71
use tor_cell::{
72
    chancell::{self, msg::AnyChanMsg, CircId},
73
    relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal},
74
};
75

            
76
use tor_error::{bad_api_usage, internal, into_internal};
77
use tor_linkspec::{CircTarget, LinkSpecType, OwnedChanTarget, RelayIdType};
78

            
79
#[cfg(feature = "hs-service")]
80
use {
81
    crate::circuit::reactor::IncomingStreamRequestContext,
82
    crate::stream::{IncomingCmdChecker, IncomingStream},
83
};
84

            
85
use futures::channel::mpsc;
86
use tor_async_utils::oneshot;
87

            
88
use crate::circuit::sendme::StreamRecvWindow;
89
use futures::{FutureExt as _, SinkExt as _};
90
use std::net::IpAddr;
91
use std::sync::{Arc, Mutex};
92
use tor_cell::relaycell::StreamId;
93
// use std::time::Duration;
94

            
95
use crate::crypto::handshake::ntor::NtorPublicKey;
96
pub use path::{Path, PathEntry};
97

            
98
/// The size of the buffer for communication between `ClientCirc` and its reactor.
99
pub const CIRCUIT_BUFFER_SIZE: usize = 128;
100

            
101
#[cfg(feature = "send-control-msg")]
102
use reactor::MetaCellHandler;
103

            
104
#[cfg(feature = "send-control-msg")]
105
#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
106
pub use {
107
    msghandler::MsgHandler,
108
    reactor::{ConversationInHandler, MetaCellDisposition},
109
};
110

            
111
#[derive(Debug)]
112
/// A circuit that we have constructed over the Tor network.
113
///
114
/// # Circuit life cycle
115
///
116
/// `ClientCirc`s are created in an initially unusable state using [`Channel::new_circ`],
117
/// which returns a [`PendingClientCirc`].  To get a real (one-hop) circuit from
118
/// one of these, you invoke one of its `create_firsthop` methods (currently
119
/// [`create_firsthop_fast()`](PendingClientCirc::create_firsthop_fast) or
120
/// [`create_firsthop_ntor()`](PendingClientCirc::create_firsthop_ntor)).
121
/// Then, to add more hops to the circuit, you can call
122
/// [`extend_ntor()`](ClientCirc::extend_ntor) on it.
123
///
124
/// For higher-level APIs, see the `tor-circmgr` crate: the ones here in
125
/// `tor-proto` are probably not what you need.
126
///
127
/// After a circuit is created, it will persist until it is closed in one of
128
/// five ways:
129
///    1. A remote error occurs.
130
///    2. Some hop on the circuit sends a `DESTROY` message to tear down the
131
///       circuit.
132
///    3. The circuit's channel is closed.
133
///    4. Someone calls [`ClientCirc::terminate`] on the circuit.
134
///    5. The last reference to the `ClientCirc` is dropped. (Note that every stream
135
///       on a `ClientCirc` keeps a reference to it, which will in turn keep the
136
///       circuit from closing until all those streams have gone away.)
137
///
138
/// Note that in cases 1-4 the [`ClientCirc`] object itself will still exist: it
139
/// will just be unusable for most purposes.  Most operations on it will fail
140
/// with an error.
141
//
142
// Effectively, this struct contains two Arcs: one for `path` and one for
143
// `control` (which surely has something Arc-like in it).  We cannot unify
144
// these by putting a single Arc around the whole struct, and passing
145
// an Arc strong reference to the `Reactor`, because then `control` would
146
// not be dropped when the last user of the circuit goes away.  We could
147
// make the reactor have a weak reference but weak references are more
148
// expensive to dereference.
149
//
150
// Because of the above, cloning this struct is always going to involve
151
// two atomic refcount changes/checks.  Wrapping it in another Arc would
152
// be overkill.
153

            
154
pub struct ClientCirc {
155
    /// Mutable state shared with the `Reactor`.
156
    mutable: Arc<Mutex<MutableState>>,
157
    /// A unique identifier for this circuit.
158
    unique_id: UniqId,
159
    /// Channel to send control messages to the reactor.
160
    control: mpsc::UnboundedSender<CtrlMsg>,
161
    /// The channel that this ClientCirc is connected to and using to speak with
162
    /// its first hop.
163
    ///
164
    /// # Warning
165
    ///
166
    /// Don't use this field to send or receive any data, or perform any network
167
    /// operations for this circuit!  All network operations should be done by
168
    /// the circuit reactor.
169
    channel: Channel,
170
    /// A future that resolves to Cancelled once the reactor is shut down,
171
    /// meaning that the circuit is closed.
172
    #[cfg_attr(not(feature = "experimental-api"), allow(dead_code))]
173
    reactor_closed_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
174
    /// For testing purposes: the CircId, for use in peek_circid().
175
    #[cfg(test)]
176
    circid: CircId,
177
}
178

            
179
/// Mutable state shared by [`ClientCirc`] and [`Reactor`].
180
#[derive(Educe)]
181
#[educe(Debug)]
182
struct MutableState {
183
    /// Information about this circuit's path.
184
    ///
185
    /// This is stored in an Arc so that we can cheaply give a copy of it to
186
    /// client code; when we need to add a hop (which is less frequent) we use
187
    /// [`Arc::make_mut()`].
188
    path: Arc<path::Path>,
189

            
190
    /// Circuit binding keys [q.v.][`CircuitBinding`] information for each hop
191
    /// in the circuit's path.
192
    ///
193
    /// NOTE: Right now, there is a `CircuitBinding` for every hop.  There's a
194
    /// fair chance that this will change in the future, and I don't want other
195
    /// code to assume that a `CircuitBinding` _must_ exist, so I'm making this
196
    /// an `Option`.
197
    #[educe(Debug(ignore))]
198
    binding: Vec<Option<CircuitBinding>>,
199
}
200

            
201
/// A ClientCirc that needs to send a create cell and receive a created* cell.
202
///
203
/// To use one of these, call create_firsthop_fast() or create_firsthop_ntor()
204
/// to negotiate the cryptographic handshake with the first hop.
205
pub struct PendingClientCirc {
206
    /// A oneshot receiver on which we'll receive a CREATED* cell,
207
    /// or a DESTROY cell.
208
    recvcreated: oneshot::Receiver<CreateResponse>,
209
    /// The ClientCirc object that we can expose on success.
210
    circ: Arc<ClientCirc>,
211
}
212

            
213
/// Description of the network's current rules for building circuits.
214
220
#[derive(Clone, Debug)]
215
pub struct CircParameters {
216
    /// Initial value to use for our outbound circuit-level windows.
217
    initial_send_window: u16,
218
    /// Whether we should include ed25519 identities when we send
219
    /// EXTEND2 cells.
220
    extend_by_ed25519_id: bool,
221
}
222

            
223
impl Default for CircParameters {
224
105
    fn default() -> CircParameters {
225
105
        CircParameters {
226
105
            initial_send_window: 1000,
227
105
            extend_by_ed25519_id: true,
228
105
        }
229
105
    }
230
}
231

            
232
impl CircParameters {
233
    /// Override the default initial send window for these parameters.
234
    /// Gives an error on any value above 1000.
235
    ///
236
    /// You should probably not call this.
237
105
    pub fn set_initial_send_window(&mut self, v: u16) -> Result<()> {
238
105
        if v <= 1000 {
239
105
            self.initial_send_window = v;
240
105
            Ok(())
241
        } else {
242
            Err(Error::from(bad_api_usage!(
243
                "Tried to set an initial send window over 1000"
244
            )))
245
        }
246
105
    }
247

            
248
    /// Return the initial send window as set in this parameter set.
249
189
    pub fn initial_send_window(&self) -> u16 {
250
189
        self.initial_send_window
251
189
    }
252

            
253
    /// Override the default decision about whether to use ed25519
254
    /// identities in outgoing EXTEND2 cells.
255
    ///
256
    /// You should probably not call this.
257
105
    pub fn set_extend_by_ed25519_id(&mut self, v: bool) {
258
105
        self.extend_by_ed25519_id = v;
259
105
    }
260

            
261
    /// Return true if we're configured to extend by ed25519 ID; false
262
    /// otherwise.
263
84
    pub fn extend_by_ed25519_id(&self) -> bool {
264
84
        self.extend_by_ed25519_id
265
84
    }
266
}
267

            
268
/// A stream on a particular circuit.
269
76
#[derive(Clone, Debug)]
270
pub(crate) struct StreamTarget {
271
    /// Which hop of the circuit this stream is with.
272
    hop_num: HopNum,
273
    /// Reactor ID for this stream.
274
    stream_id: StreamId,
275
    /// Channel to send cells down.
276
    tx: mpsc::Sender<AnyRelayMsg>,
277
    /// Reference to the circuit that this stream is on.
278
    circ: Arc<ClientCirc>,
279
}
280

            
281
impl ClientCirc {
282
    /// Return a description of the first hop of this circuit.
283
    ///
284
    /// # Panics
285
    ///
286
    /// Panics if there is no first hop.  (This should be impossible outside of
287
    /// the tor-proto crate, but within the crate it's possible to have a
288
    /// circuit with no hops.)
289
63
    pub fn first_hop(&self) -> OwnedChanTarget {
290
63
        let first_hop = self
291
63
            .mutable
292
63
            .lock()
293
63
            .expect("poisoned lock")
294
63
            .path
295
63
            .first_hop()
296
63
            .expect("called first_hop on an un-constructed circuit");
297
63
        match first_hop {
298
63
            path::HopDetail::Relay(r) => r,
299
            #[cfg(feature = "hs-common")]
300
            path::HopDetail::Virtual => {
301
                panic!("somehow made a circuit with a virtual first hop.")
302
            }
303
        }
304
63
    }
305

            
306
    /// Return the [`HopNum`] of the last hop of this circuit.
307
    ///
308
    /// Returns an error if there is no last hop.  (This should be impossible outside of the
309
    /// tor-proto crate, but within the crate it's possible to have a circuit with no hops.)
310
    pub fn last_hop_num(&self) -> Result<HopNum> {
311
        Ok(self
312
            .mutable
313
            .lock()
314
            .expect("poisoned lock")
315
            .path
316
            .last_hop_num()
317
            .ok_or_else(|| internal!("no last hop index"))?)
318
    }
319

            
320
    /// Return a description of all the hops in this circuit.
321
    ///
322
    /// This method is **deprecated** for several reasons:
323
    ///   * It performs a deep copy.
324
    ///   * It ignores virtual hops.
325
    ///   * It's not so extensible.
326
    ///
327
    /// Use [`ClientCirc::path_ref()`] instead.
328
    #[deprecated(since = "0.11.1", note = "Use path_ref() instead.")]
329
    pub fn path(&self) -> Vec<OwnedChanTarget> {
330
        #[allow(clippy::unnecessary_filter_map)] // clippy is blind to the cfg
331
        self.mutable
332
            .lock()
333
            .expect("poisoned lock")
334
            .path
335
            .all_hops()
336
            .into_iter()
337
            .filter_map(|hop| match hop {
338
                path::HopDetail::Relay(r) => Some(r),
339
                #[cfg(feature = "hs-common")]
340
                path::HopDetail::Virtual => None,
341
            })
342
            .collect()
343
    }
344

            
345
    /// Return a [`Path`] object describing all the hops in this circuit.
346
    ///
347
    /// Note that this `Path` is not automatically updated if the circuit is
348
    /// extended.
349
    pub fn path_ref(&self) -> Arc<Path> {
350
        self.mutable.lock().expect("poisoned_lock").path.clone()
351
    }
352

            
353
    /// Return a reference to the channel that this circuit is connected to.
354
    ///
355
    /// A client circuit is always connected to some relay via a [`Channel`].
356
    /// That relay has to be the same relay as the first hop in the client's
357
    /// path.
358
21
    pub fn channel(&self) -> &Channel {
359
21
        &self.channel
360
21
    }
361

            
362
    /// Return the cryptographic material used to prove knowledge of a shared
363
    /// secret with with `hop`.
364
    ///
365
    /// See [`CircuitBinding`] for more information on how this is used.
366
    ///
367
    /// Return None if we have no circuit binding information for the hop, or if
368
    /// the hop does not exist.
369
    pub fn binding_key(&self, hop: HopNum) -> Option<CircuitBinding> {
370
        self.mutable
371
            .lock()
372
            .expect("poisoned lock")
373
            .binding
374
            .get::<usize>(hop.into())
375
            .cloned()
376
            .flatten()
377
        // NOTE: I'm not thrilled to have to copy this information, but we use
378
        // it very rarely, so it's not _that_ bad IMO.
379
    }
380

            
381
    /// Start an ad-hoc protocol exchange to the specified hop on this circuit
382
    ///
383
    /// To use this:
384
    ///
385
    ///  0. Create an inter-task channel you'll use to receive
386
    ///     the outcome of your conversation,
387
    ///     and bundle it into a [`MsgHandler`].
388
    ///
389
    ///  1. Call `start_conversation`.
390
    ///     This will install a your handler, for incoming messages,
391
    ///     and send the outgoing message (if you provided one).
392
    ///     After that, each message on the circuit
393
    ///     that isn't handled by the core machinery
394
    ///     is passed to your provided `reply_handler`.
395
    ///
396
    ///  2. Possibly call `send_msg` on the [`Conversation`],
397
    ///     from the call site of `start_conversation`,
398
    ///     possibly multiple times, from time to time,
399
    ///     to send further desired messages to the peer.
400
    ///
401
    ///  3. In your [`MsgHandler`], process the incoming messages.
402
    ///     You may respond by
403
    ///     sending additional messages
404
    ///     (using the [`ConversationInHandler`] provided to `MsgHandler::handle_msg`,
405
    ///     or, outside the handler using the `Conversation`)
406
    ///     When the protocol exchange is finished,
407
    ///     `MsgHandler::handle_msg` should return
408
    ///     [`ConversationFinished`](MetaCellDisposition::ConversationFinished).
409
    ///
410
    /// If you don't need the `Conversation` to send followup messages,
411
    /// you may simply drop it,
412
    /// and rely on the responses you get from your handler,
413
    /// on the channel from step 0 above.
414
    /// Your handler will remain installed and able to process incoming messages
415
    /// until it returns `ConversationFinished`.
416
    ///
417
    /// (If you don't want to accept any replies at all, it may be
418
    /// simpler to use [`ClientCirc::send_raw_msg`].)
419
    ///
420
    /// Note that it is quite possible to use this function to violate the tor
421
    /// protocol; most users of this API will not need to call it.  It is used
422
    /// to implement most of the onion service handshake.
423
    ///
424
    /// # Limitations
425
    ///
426
    /// Only one conversation may be active at any one time,
427
    /// for any one circuit.
428
    /// This generally means that this function should not be called
429
    /// on a circuit which might be shared with anyone else.
430
    ///
431
    /// Likewise, it is forbidden to try to extend the circuit,
432
    /// while the conversation is in progress.
433
    ///
434
    /// After the conversation has finished, the circuit may be extended.
435
    /// Or, `start_conversation` may be called again;
436
    /// but, in that case there will be a gap between the two conversations,
437
    /// during which no `MsgHandler` is installed,
438
    /// and unexpected incoming messages would close the circuit.
439
    ///
440
    /// If these restrictions are violated, the circuit will be closed with an error.
441
    ///
442
    /// ## Precise definition of the lifetime of a conversation
443
    ///
444
    /// A conversation is in progress from entry to `start_conversation`,
445
    /// until entry to the body of the [`MsgHandler::handle_msg`]
446
    /// call which returns [`ConversationFinished`](MetaCellDisposition::ConversationFinished).
447
    /// (*Entry* since `handle_msg` is synchronously embedded
448
    /// into the incoming message processing.)
449
    /// So you may start a new conversation as soon as you have the final response
450
    /// via your inter-task channel from (0) above.
451
    ///
452
    /// The lifetime relationship of the [`Conversation`],
453
    /// vs the handler returning `ConversationFinished`
454
    /// is not enforced by the type system.
455
    // Doing so without still leaving plenty of scope for runtime errors doesn't seem possible,
456
    // at least while allowing sending followup messages from outside the handler.
457
    //
458
    // TODO hs: it might be nice to avoid exposing tor-cell APIs in the
459
    //   tor-proto interface.
460
    #[cfg(feature = "send-control-msg")]
461
    pub async fn start_conversation(
462
        &self,
463
        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
464
        reply_handler: impl MsgHandler + Send + 'static,
465
        hop_num: HopNum,
466
    ) -> Result<Conversation<'_>> {
467
        let handler = Box::new(msghandler::UserMsgHandler::new(hop_num, reply_handler));
468
        let conversation = Conversation(self);
469
        conversation.send_internal(msg, Some(handler)).await?;
470
        Ok(conversation)
471
    }
472

            
473
    /// Start an ad-hoc protocol exchange to the final hop on this circuit
474
    ///
475
    /// See the [`ClientCirc::start_conversation`] docs for more information.
476
    #[cfg(feature = "send-control-msg")]
477
    #[deprecated(since = "0.13.0", note = "Use start_conversation instead.")]
478
    pub async fn start_conversation_last_hop(
479
        &self,
480
        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
481
        reply_handler: impl MsgHandler + Send + 'static,
482
    ) -> Result<Conversation<'_>> {
483
        let last_hop = self
484
            .mutable
485
            .lock()
486
            .expect("poisoned lock")
487
            .path
488
            .last_hop_num()
489
            .ok_or_else(|| internal!("no last hop index"))?;
490

            
491
        self.start_conversation(msg, reply_handler, last_hop).await
492
    }
493

            
494
    /// Send an ad-hoc message to a given hop on the circuit, without expecting
495
    /// a reply.
496
    ///
497
    /// (If you want to handle one or more possible replies, see
498
    /// [`ClientCirc::start_conversation`].)
499
    #[cfg(feature = "send-control-msg")]
500
    pub async fn send_raw_msg(
501
        &self,
502
        msg: tor_cell::relaycell::msg::AnyRelayMsg,
503
        hop_num: HopNum,
504
    ) -> Result<()> {
505
        let (sender, receiver) = oneshot::channel();
506
        let ctrl_msg = CtrlMsg::SendMsg {
507
            hop_num,
508
            msg,
509
            sender,
510
        };
511
        self.control
512
            .unbounded_send(ctrl_msg)
513
            .map_err(|_| Error::CircuitClosed)?;
514

            
515
        receiver.await.map_err(|_| Error::CircuitClosed)?
516
    }
517

            
518
    /// Tell this circuit to begin allowing the final hop of the circuit to try
519
    /// to create new Tor streams, and to return those pending requests in an
520
    /// asynchronous stream.
521
    ///
522
    /// Ordinarily, these requests are rejected.
523
    ///
524
    /// There can only be one [`Stream`](futures::Stream) of this type created on a given circuit.
525
    /// If a such a [`Stream`](futures::Stream) already exists, this method will return
526
    /// an error.
527
    ///
528
    /// After this method has been called on a circuit, the circuit is expected
529
    /// to receive requests of this type indefinitely, until it is finally closed.
530
    /// If the `Stream` is dropped, the next request on this circuit will cause it to close.
531
    ///
532
    /// Only onion services (and eventually) exit relays should call this
533
    /// method.
534
    //
535
    // TODO: Someday, we might want to allow a stream request handler to be
536
    // un-registered.  However, nothing in the Tor protocol requires it.
537
    #[cfg(feature = "hs-service")]
538
    pub async fn allow_stream_requests(
539
        self: &Arc<ClientCirc>,
540
        allow_commands: &[tor_cell::relaycell::RelayCmd],
541
        hop_num: HopNum,
542
    ) -> Result<impl futures::Stream<Item = IncomingStream>> {
543
        use futures::stream::StreamExt;
544

            
545
        /// The size of the channel receiving IncomingStreamRequestContexts.
546
        const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
547

            
548
        let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
549
        let (incoming_sender, incoming_receiver) = mpsc::channel(INCOMING_BUFFER);
550
        let (tx, rx) = oneshot::channel();
551

            
552
        self.control
553
            .unbounded_send(CtrlMsg::AwaitStreamRequest {
554
                cmd_checker,
555
                incoming_sender,
556
                hop_num,
557
                done: tx,
558
            })
559
            .map_err(|_| Error::CircuitClosed)?;
560

            
561
        // Check whether the AwaitStreamRequest was processed successfully.
562
        rx.await.map_err(|_| Error::CircuitClosed)??;
563

            
564
        let allowed_hop_num = hop_num;
565

            
566
        let circ = Arc::clone(self);
567
        Ok(incoming_receiver.map(move |req_ctx| {
568
            let IncomingStreamRequestContext {
569
                req,
570
                stream_id,
571
                hop_num,
572
                receiver,
573
                msg_tx,
574
            } = req_ctx;
575

            
576
            // We already enforce this in handle_incoming_stream_request; this
577
            // assertion is just here to make sure that we don't ever
578
            // accidentally remove or fail to enforce that check, since it is
579
            // security-critical.
580
            assert_eq!(allowed_hop_num, hop_num);
581

            
582
            let target = StreamTarget {
583
                circ: Arc::clone(&circ),
584
                tx: msg_tx,
585
                hop_num,
586
                stream_id,
587
            };
588

            
589
            let reader = StreamReader {
590
                target: target.clone(),
591
                receiver,
592
                recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
593
                ended: false,
594
            };
595

            
596
            IncomingStream::new(req, target, reader)
597
        }))
598
    }
599

            
600
    /// Extend the circuit via the ntor handshake to a new target last
601
    /// hop.
602
    pub async fn extend_ntor<Tg>(&self, target: &Tg, params: &CircParameters) -> Result<()>
603
    where
604
        Tg: CircTarget,
605
    {
606
        let key = NtorPublicKey {
607
            id: *target
608
                .rsa_identity()
609
                .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
610
            pk: *target.ntor_onion_key(),
611
        };
612
        let mut linkspecs = target
613
            .linkspecs()
614
            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
615
        if !params.extend_by_ed25519_id() {
616
            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
617
        }
618

            
619
        let (tx, rx) = oneshot::channel();
620

            
621
        let peer_id = OwnedChanTarget::from_chan_target(target);
622
        self.control
623
            .unbounded_send(CtrlMsg::ExtendNtor {
624
                peer_id,
625
                public_key: key,
626
                linkspecs,
627
                params: params.clone(),
628
                done: tx,
629
            })
630
            .map_err(|_| Error::CircuitClosed)?;
631

            
632
        rx.await.map_err(|_| Error::CircuitClosed)??;
633

            
634
        Ok(())
635
    }
636

            
637
    /// Extend the circuit via the ntor handshake to a new target last
638
    /// hop.
639
    #[cfg(feature = "ntor_v3")]
640
76
    pub async fn extend_ntor_v3<Tg>(&self, target: &Tg, params: &CircParameters) -> Result<()>
641
76
    where
642
76
        Tg: CircTarget,
643
76
    {
644
76
        let key = NtorV3PublicKey {
645
76
            id: *target
646
76
                .ed_identity()
647
76
                .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
648
76
            pk: *target.ntor_onion_key(),
649
        };
650
76
        let mut linkspecs = target
651
76
            .linkspecs()
652
76
            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
653
76
        if !params.extend_by_ed25519_id() {
654
228
            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
655
76
        }
656

            
657
76
        let (tx, rx) = oneshot::channel();
658
76

            
659
76
        let peer_id = OwnedChanTarget::from_chan_target(target);
660
76
        self.control
661
76
            .unbounded_send(CtrlMsg::ExtendNtorV3 {
662
76
                peer_id,
663
76
                public_key: key,
664
76
                linkspecs,
665
76
                params: params.clone(),
666
76
                done: tx,
667
76
            })
668
76
            .map_err(|_| Error::CircuitClosed)?;
669

            
670
76
        rx.await.map_err(|_| Error::CircuitClosed)??;
671

            
672
76
        Ok(())
673
76
    }
674

            
675
    /// Extend this circuit by a single, "virtual" hop.
676
    ///
677
    /// A virtual hop is one for which we do not add an actual network connection
678
    /// between separate hosts (such as Relays).  We only add a layer of
679
    /// cryptography.
680
    ///
681
    /// This is used to implement onion services: the client and the service
682
    /// both build a circuit to a single rendezvous point, and tell the
683
    /// rendezvous point to relay traffic between their two circuits.  Having
684
    /// completed a [`handshake`] out of band[^1], the parties each extend their
685
    /// circuits by a single "virtual" encryption hop that represents their
686
    /// shared cryptographic context.
687
    ///
688
    /// Once a circuit has been extended in this way, it is an error to try to
689
    /// extend it in any other way.
690
    ///
691
    /// [^1]: Technically, the handshake is only _mostly_ out of band: the
692
    ///     client sends their half of the handshake in an ` message, and the
693
    ///     service's response is inline in its `RENDEZVOUS2` message.
694
    //
695
    // TODO hs: let's try to enforce the "you can't extend a circuit again once
696
    // it has been extended this way" property.  We could do that with internal
697
    // state, or some kind of a type state pattern.
698
    //
699
    // TODO hs: possibly we should take a set of Protovers, and not just `Params`.
700
    #[cfg(feature = "hs-common")]
701
    pub async fn extend_virtual(
702
        &self,
703
        protocol: handshake::RelayProtocol,
704
        role: handshake::HandshakeRole,
705
        seed: impl handshake::KeyGenerator,
706
        params: CircParameters,
707
    ) -> Result<()> {
708
        use self::handshake::BoxedClientLayer;
709

            
710
        let BoxedClientLayer { fwd, back, binding } = protocol.construct_layers(role, seed)?;
711

            
712
        let (tx, rx) = oneshot::channel();
713
        let message = CtrlMsg::ExtendVirtual {
714
            cell_crypto: (fwd, back, binding),
715
            params,
716
            done: tx,
717
        };
718

            
719
        self.control
720
            .unbounded_send(message)
721
            .map_err(|_| Error::CircuitClosed)?;
722

            
723
        rx.await.map_err(|_| Error::CircuitClosed)?
724
    }
725

            
726
    /// Helper, used to begin a stream.
727
    ///
728
    /// This function allocates a stream ID, and sends the message
729
    /// (like a BEGIN or RESOLVE), but doesn't wait for a response.
730
    ///
731
    /// The caller will typically want to see the first cell in response,
732
    /// to see whether it is e.g. an END or a CONNECTED.
733
84
    async fn begin_stream_impl(
734
84
        self: &Arc<ClientCirc>,
735
84
        begin_msg: AnyRelayMsg,
736
84
        cmd_checker: AnyCmdChecker,
737
88
    ) -> Result<(StreamReader, StreamTarget)> {
738
        // TODO: Possibly this should take a hop, rather than just
739
        // assuming it's the last hop.
740

            
741
76
        let hop_num = self
742
76
            .mutable
743
76
            .lock()
744
76
            .expect("poisoned lock")
745
76
            .path
746
76
            .last_hop_num()
747
76
            .ok_or_else(|| Error::from(internal!("Can't begin a stream at the 0th hop")))?;
748

            
749
76
        let (sender, receiver) = mpsc::channel(STREAM_READER_BUFFER);
750
76
        let (tx, rx) = oneshot::channel();
751
76
        let (msg_tx, msg_rx) = mpsc::channel(CIRCUIT_BUFFER_SIZE);
752
76

            
753
76
        self.control
754
76
            .unbounded_send(CtrlMsg::BeginStream {
755
76
                hop_num,
756
76
                message: begin_msg,
757
76
                sender,
758
76
                rx: msg_rx,
759
76
                done: tx,
760
76
                cmd_checker,
761
76
            })
762
76
            .map_err(|_| Error::CircuitClosed)?;
763

            
764
114
        let stream_id = rx.await.map_err(|_| Error::CircuitClosed)??;
765

            
766
76
        let target = StreamTarget {
767
76
            circ: self.clone(),
768
76
            tx: msg_tx,
769
76
            hop_num,
770
76
            stream_id,
771
76
        };
772
76

            
773
76
        let reader = StreamReader {
774
76
            target: target.clone(),
775
76
            receiver,
776
76
            recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
777
76
            ended: false,
778
76
        };
779
76

            
780
76
        Ok((reader, target))
781
76
    }
782

            
783
    /// Start a DataStream (anonymized connection) to the given
784
    /// address and port, using a BEGIN cell.
785
84
    async fn begin_data_stream(
786
84
        self: &Arc<ClientCirc>,
787
84
        msg: AnyRelayMsg,
788
84
        optimistic: bool,
789
88
    ) -> Result<DataStream> {
790
76
        let (reader, target) = self
791
76
            .begin_stream_impl(msg, DataCmdChecker::new_any())
792
114
            .await?;
793
76
        let mut stream = DataStream::new(reader, target);
794
76
        if !optimistic {
795
19
            stream.wait_for_connection().await?;
796
57
        }
797
76
        Ok(stream)
798
76
    }
799

            
800
    /// Start a stream to the given address and port, using a BEGIN
801
    /// cell.
802
    ///
803
    /// The use of a string for the address is intentional: you should let
804
    /// the remote Tor relay do the hostname lookup for you.
805
21
    pub async fn begin_stream(
806
21
        self: &Arc<ClientCirc>,
807
21
        target: &str,
808
21
        port: u16,
809
21
        parameters: Option<StreamParameters>,
810
22
    ) -> Result<DataStream> {
811
7
        let parameters = parameters.unwrap_or_default();
812
7
        let begin_flags = parameters.begin_flags();
813
7
        let optimistic = parameters.is_optimistic();
814
7
        let target = if parameters.suppressing_hostname() {
815
            ""
816
        } else {
817
7
            target
818
        };
819
7
        let beginmsg = Begin::new(target, port, begin_flags)
820
7
            .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
821
21
        self.begin_data_stream(beginmsg.into(), optimistic).await
822
7
    }
823

            
824
    /// Start a new stream to the last relay in the circuit, using
825
    /// a BEGIN_DIR cell.
826
66
    pub async fn begin_dir_stream(self: Arc<ClientCirc>) -> Result<DataStream> {
827
57
        // Note that we always open begindir connections optimistically.
828
57
        // Since they are local to a relay that we've already authenticated
829
57
        // with and built a circuit to, there should be no additional checks
830
57
        // we need to perform to see whether the BEGINDIR will succeed.
831
57
        self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
832
76
            .await
833
57
    }
834

            
835
    /// Perform a DNS lookup, using a RESOLVE cell with the last relay
836
    /// in this circuit.
837
    ///
838
    /// Note that this function does not check for timeouts; that's
839
    /// the caller's responsibility.
840
    pub async fn resolve(self: &Arc<ClientCirc>, hostname: &str) -> Result<Vec<IpAddr>> {
841
        let resolve_msg = Resolve::new(hostname);
842

            
843
        let resolved_msg = self.try_resolve(resolve_msg).await?;
844

            
845
        resolved_msg
846
            .into_answers()
847
            .into_iter()
848
            .filter_map(|(val, _)| match resolvedval_to_result(val) {
849
                Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
850
                Ok(_) => None,
851
                Err(e) => Some(Err(e)),
852
            })
853
            .collect()
854
    }
855

            
856
    /// Perform a reverse DNS lookup, by sending a RESOLVE cell with
857
    /// the last relay on this circuit.
858
    ///
859
    /// Note that this function does not check for timeouts; that's
860
    /// the caller's responsibility.
861
    pub async fn resolve_ptr(self: &Arc<ClientCirc>, addr: IpAddr) -> Result<Vec<String>> {
862
        let resolve_ptr_msg = Resolve::new_reverse(&addr);
863

            
864
        let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
865

            
866
        resolved_msg
867
            .into_answers()
868
            .into_iter()
869
            .filter_map(|(val, _)| match resolvedval_to_result(val) {
870
                Ok(ResolvedVal::Hostname(v)) => Some(
871
                    String::from_utf8(v)
872
                        .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
873
                ),
874
                Ok(_) => None,
875
                Err(e) => Some(Err(e)),
876
            })
877
            .collect()
878
    }
879

            
880
    /// Helper: Send the resolve message, and read resolved message from
881
    /// resolve stream.
882
    async fn try_resolve(self: &Arc<ClientCirc>, msg: Resolve) -> Result<Resolved> {
883
        let (reader, _) = self
884
            .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
885
            .await?;
886
        let mut resolve_stream = ResolveStream::new(reader);
887
        resolve_stream.read_msg().await
888
    }
889

            
890
    /// Shut down this circuit, along with all streams that are using it.
891
    /// Happens asynchronously (i.e. the circuit won't necessarily be done shutting down
892
    /// immediately after this function returns!).
893
    ///
894
    /// Note that other references to this circuit may exist.  If they
895
    /// do, they will stop working after you call this function.
896
    ///
897
    /// It's not necessary to call this method if you're just done
898
    /// with a circuit: the circuit should close on its own once nothing
899
    /// is using it any more.
900
    pub fn terminate(&self) {
901
        let _ = self.control.unbounded_send(CtrlMsg::Shutdown);
902
    }
903

            
904
    /// Called when a circuit-level protocol error has occurred and the
905
    /// circuit needs to shut down.
906
    ///
907
    /// This is a separate function because we may eventually want to have
908
    /// it do more than just shut down.
909
    ///
910
    /// As with `terminate`, this function is asynchronous.
911
    pub(crate) fn protocol_error(&self) {
912
        self.terminate();
913
    }
914

            
915
    /// Return true if this circuit is closed and therefore unusable.
916
336
    pub fn is_closing(&self) -> bool {
917
336
        self.control.is_closed()
918
336
    }
919

            
920
    /// Return a process-unique identifier for this circuit.
921
378
    pub fn unique_id(&self) -> UniqId {
922
378
        self.unique_id
923
378
    }
924

            
925
    /// Return the number of hops in this circuit.
926
    ///
927
    /// NOTE: This function will currently return only the number of hops
928
    /// _currently_ in the circuit. If there is an extend operation in progress,
929
    /// the currently pending hop may or may not be counted, depending on whether
930
    /// the extend operation finishes before this call is done.
931
    pub fn n_hops(&self) -> usize {
932
        self.mutable.lock().expect("poisoned lock").path.n_hops()
933
    }
934

            
935
    /// Return a future that will resolve once this circuit has closed.
936
    ///
937
    /// Note that this method does not itself cause the circuit to shut down.
938
    ///
939
    /// TODO: Perhaps this should return some kind of status indication instead
940
    /// of just ()
941
    #[cfg(feature = "experimental-api")]
942
    pub fn wait_for_close(&self) -> impl futures::Future<Output = ()> + Send + Sync + 'static {
943
        self.reactor_closed_rx.clone().map(|_| ())
944
    }
945
}
946

            
947
/// Handle to use during an ongoing protocol exchange with a circuit's last hop
948
///
949
/// This is obtained from [`ClientCirc::start_conversation`],
950
/// and used to send messages to the last hop relay.
951
///
952
/// See also [`ConversationInHandler`], which is a type used for the same purpose
953
/// but available only inside [`MsgHandler::handle_msg`].
954
#[cfg(feature = "send-control-msg")]
955
#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
956
pub struct Conversation<'r>(&'r ClientCirc);
957

            
958
#[cfg(feature = "send-control-msg")]
959
#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
960
impl Conversation<'_> {
961
    /// Send a protocol message as part of an ad-hoc exchange
962
    ///
963
    /// Responses are handled by the `MsgHandler` set up
964
    /// when the `Conversation` was created.
965
    pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
966
        self.send_internal(Some(msg), None).await
967
    }
968

            
969
    /// Send a `SendMsgAndInstallHandler` to the reactor and wait for the outcome
970
    ///
971
    /// The guts of `start_conversation` and `Conversation::send_msg`
972
    async fn send_internal(
973
        &self,
974
        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
975
        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
976
    ) -> Result<()> {
977
        let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
978
        let (sender, receiver) = oneshot::channel();
979

            
980
        let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
981
            msg,
982
            handler,
983
            sender,
984
        };
985
        self.0
986
            .control
987
            .unbounded_send(ctrl_msg)
988
            .map_err(|_| Error::CircuitClosed)?;
989

            
990
        receiver.await.map_err(|_| Error::CircuitClosed)?
991
    }
992
}
993

            
994
impl PendingClientCirc {
995
    /// Instantiate a new circuit object: used from Channel::new_circ().
996
    ///
997
    /// Does not send a CREATE* cell on its own.
998
    ///
999
    ///
105
    pub(crate) fn new(
105
        id: CircId,
105
        channel: Channel,
105
        createdreceiver: oneshot::Receiver<CreateResponse>,
105
        input: mpsc::Receiver<ClientCircChanMsg>,
105
        unique_id: UniqId,
105
    ) -> (PendingClientCirc, reactor::Reactor) {
105
        let (reactor, control_tx, reactor_closed_rx, mutable) =
105
            Reactor::new(channel.clone(), id, unique_id, input);
105

            
105
        let circuit = ClientCirc {
105
            mutable,
105
            unique_id,
105
            control: control_tx,
105
            reactor_closed_rx: reactor_closed_rx.shared(),
105
            channel,
105
            #[cfg(test)]
105
            circid: id,
105
        };
105

            
105
        let pending = PendingClientCirc {
105
            recvcreated: createdreceiver,
105
            circ: Arc::new(circuit),
105
        };
105
        (pending, reactor)
105
    }
    /// Testing only: Extract the circuit ID for this pending circuit.
    #[cfg(test)]
    pub(crate) fn peek_circid(&self) -> CircId {
        self.circ.circid
    }
    /// Use the (questionable!) CREATE_FAST handshake to connect to the
    /// first hop of this circuit.
    ///
    /// There's no authentication in CRATE_FAST,
    /// so we don't need to know whom we're connecting to: we're just
    /// connecting to whichever relay the channel is for.
66
    pub async fn create_firsthop_fast(self, params: &CircParameters) -> Result<Arc<ClientCirc>> {
57
        let (tx, rx) = oneshot::channel();
57
        self.circ
57
            .control
57
            .unbounded_send(CtrlMsg::Create {
57
                recv_created: self.recvcreated,
57
                handshake: CircuitHandshake::CreateFast,
57
                params: params.clone(),
57
                done: tx,
57
            })
57
            .map_err(|_| Error::CircuitClosed)?;
57
        rx.await.map_err(|_| Error::CircuitClosed)??;
57
        Ok(self.circ)
57
    }
    /// Use the ntor handshake to connect to the first hop of this circuit.
    ///
    /// Note that the provided 'target' must match the channel's target,
    /// or the handshake will fail.
    pub async fn create_firsthop_ntor<Tg>(
        self,
        target: &Tg,
        params: CircParameters,
    ) -> Result<Arc<ClientCirc>>
    where
        Tg: tor_linkspec::CircTarget,
    {
        let (tx, rx) = oneshot::channel();
        self.circ
            .control
            .unbounded_send(CtrlMsg::Create {
                recv_created: self.recvcreated,
                handshake: CircuitHandshake::Ntor {
                    public_key: NtorPublicKey {
                        id: *target
                            .rsa_identity()
                            .ok_or(Error::MissingId(RelayIdType::Rsa))?,
                        pk: *target.ntor_onion_key(),
                    },
                    ed_identity: *target
                        .ed_identity()
                        .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
                },
                params: params.clone(),
                done: tx,
            })
            .map_err(|_| Error::CircuitClosed)?;
        rx.await.map_err(|_| Error::CircuitClosed)??;
        Ok(self.circ)
    }
    /// Use the ntor_v3 handshake to connect to the first hop of this circuit.
    ///
    /// Assumes that the target supports ntor_v3. The caller should verify
    /// this before calling this function, e.g. by validating that the target
    /// has advertised ["Relay=4"](https://spec.torproject.org/tor-spec/subprotocol-versioning.html#relay).
    ///
    /// Note that the provided 'target' must match the channel's target,
    /// or the handshake will fail.
    #[cfg(feature = "ntor_v3")]
38
    pub async fn create_firsthop_ntor_v3<Tg>(
38
        self,
38
        target: &Tg,
38
        params: CircParameters,
38
    ) -> Result<Arc<ClientCirc>>
38
    where
38
        Tg: tor_linkspec::CircTarget,
38
    {
38
        let (tx, rx) = oneshot::channel();
38

            
38
        self.circ
38
            .control
38
            .unbounded_send(CtrlMsg::Create {
38
                recv_created: self.recvcreated,
38
                handshake: CircuitHandshake::NtorV3 {
38
                    public_key: NtorV3PublicKey {
38
                        id: *target
38
                            .ed_identity()
38
                            .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
38
                        pk: *target.ntor_onion_key(),
38
                    },
38
                },
38
                params: params.clone(),
38
                done: tx,
38
            })
38
            .map_err(|_| Error::CircuitClosed)?;
38
        rx.await.map_err(|_| Error::CircuitClosed)??;
38
        Ok(self.circ)
38
    }
}
/// An object that can put a given handshake into a ChanMsg for a CREATE*
/// cell, and unwrap a CREATED* cell.
trait CreateHandshakeWrap {
    /// Construct an appropriate ChanMsg to hold this kind of handshake.
    fn to_chanmsg(&self, bytes: Vec<u8>) -> AnyChanMsg;
    /// Decode a ChanMsg to an appropriate handshake value, checking
    /// its type.
    fn decode_chanmsg(&self, msg: CreateResponse) -> Result<Vec<u8>>;
}
/// A CreateHandshakeWrap that generates CREATE_FAST and handles CREATED_FAST.
struct CreateFastWrap;
impl CreateHandshakeWrap for CreateFastWrap {
63
    fn to_chanmsg(&self, bytes: Vec<u8>) -> AnyChanMsg {
63
        chancell::msg::CreateFast::new(bytes).into()
63
    }
63
    fn decode_chanmsg(&self, msg: CreateResponse) -> Result<Vec<u8>> {
63
        use CreateResponse::*;
63
        match msg {
63
            CreatedFast(m) => Ok(m.into_handshake()),
            Destroy(_) => Err(Error::CircRefused(
                "Relay replied to CREATE_FAST with DESTROY.",
            )),
            _ => Err(Error::CircProto(format!(
                "Relay replied to CREATE_FAST with unexpected cell: {}",
                msg
            ))),
        }
63
    }
}
/// A CreateHandshakeWrap that generates CREATE2 and handles CREATED2
struct Create2Wrap {
    /// The handshake type to put in the CREATE2 cell.
    handshake_type: HandshakeType,
}
impl CreateHandshakeWrap for Create2Wrap {
42
    fn to_chanmsg(&self, bytes: Vec<u8>) -> AnyChanMsg {
42
        chancell::msg::Create2::new(self.handshake_type, bytes).into()
42
    }
42
    fn decode_chanmsg(&self, msg: CreateResponse) -> Result<Vec<u8>> {
42
        use CreateResponse::*;
42
        match msg {
42
            Created2(m) => Ok(m.into_body()),
            Destroy(_) => Err(Error::CircRefused("Relay replied to CREATE2 with DESTROY.")),
            _ => Err(Error::CircProto(format!(
                "Relay replied to CREATE2 with unexpected cell {}",
                msg
            ))),
        }
42
    }
}
impl StreamTarget {
    /// Deliver a relay message for the stream that owns this StreamTarget.
    ///
    /// The StreamTarget will set the correct stream ID and pick the
    /// right hop, but will not validate that the message is well-formed
    /// or meaningful in context.
88
    pub(crate) async fn send(&mut self, msg: AnyRelayMsg) -> Result<()> {
84
        self.tx.send(msg).await.map_err(|_| Error::CircuitClosed)?;
84
        Ok(())
84
    }
    /// Close the pending stream that owns this StreamTarget, delivering the specified
    /// END message (if any)
    ///
    /// The stream is closed by sending a [`CtrlMsg::ClosePendingStream`] message to the reactor.
    ///
    /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
    ///
    /// The StreamTarget will set the correct stream ID and pick the
    /// right hop, but will not validate that the message is well-formed
    /// or meaningful in context.
    ///
    /// Note that in many cases, the actual contents of an END message can leak unwanted
    /// information. Please consider carefully before sending anything but an
    /// [`End::new_misc()`](tor_cell::relaycell::msg::End::new_misc) message over a `ClientCirc`.
    /// (For onion services, we send [`DONE`](tor_cell::relaycell::msg::EndReason::DONE) )
    ///
    /// In addition to sending the END message, this function also ensures
    /// the state of the stream map entry of this stream is updated
    /// accordingly.
    ///
    /// Normally, you shouldn't need to call this function, as streams are implicitly closed by the
    /// reactor when their corresponding `StreamTarget` is dropped. The only valid use of this
    /// function is for closing pending incoming streams (a stream is said to be pending if we have
    /// received the message initiating the stream but have not responded to it yet).
    ///
    /// **NOTE**: This function should be called at most once per request.
    /// Calling it twice is an error.
    #[cfg(feature = "hs-service")]
    pub(crate) fn close_pending(
        &self,
        message: reactor::CloseStreamBehavior,
    ) -> Result<oneshot::Receiver<Result<()>>> {
        let (tx, rx) = oneshot::channel();
        self.circ
            .control
            .unbounded_send(CtrlMsg::ClosePendingStream {
                stream_id: self.stream_id,
                hop_num: self.hop_num,
                message,
                done: tx,
            })
            .map_err(|_| Error::CircuitClosed)?;
        Ok(rx)
    }
    /// Called when a circuit-level protocol error has occurred and the
    /// circuit needs to shut down.
    pub(crate) fn protocol_error(&mut self) {
        self.circ.protocol_error();
    }
    /// Send a SENDME cell for this stream.
    pub(crate) fn send_sendme(&mut self) -> Result<()> {
        self.circ
            .control
            .unbounded_send(CtrlMsg::SendSendme {
                stream_id: self.stream_id,
                hop_num: self.hop_num,
            })
            .map_err(|_| Error::CircuitClosed)?;
        Ok(())
    }
    /// Return a reference to the circuit that this `StreamTarget` is using.
    #[cfg(feature = "experimental-api")]
168
    pub(crate) fn circuit(&self) -> &Arc<ClientCirc> {
168
        &self.circ
168
    }
}
/// Convert a [`ResolvedVal`] into a Result, based on whether or not
/// it represents an error.
fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
    match val {
        ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
        ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
        ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
        _ => Ok(val),
    }
}
#[cfg(test)]
mod test {
    // @@ begin test lint list maintained by maint/add_warning @@
    #![allow(clippy::bool_assert_comparison)]
    #![allow(clippy::clone_on_copy)]
    #![allow(clippy::dbg_macro)]
    #![allow(clippy::print_stderr)]
    #![allow(clippy::print_stdout)]
    #![allow(clippy::single_char_pattern)]
    #![allow(clippy::unwrap_used)]
    #![allow(clippy::unchecked_duration_subtraction)]
    #![allow(clippy::useless_vec)]
    #![allow(clippy::needless_pass_by_value)]
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
    use super::*;
    use crate::channel::OpenChanCellS2C;
    use crate::channel::{test::new_reactor, CodecError};
    use crate::crypto::cell::RelayCellBody;
    #[cfg(feature = "ntor_v3")]
    use crate::crypto::handshake::ntor_v3::NtorV3Server;
    use chanmsg::{AnyChanMsg, Created2, CreatedFast};
    use futures::channel::mpsc::{Receiver, Sender};
    use futures::io::{AsyncReadExt, AsyncWriteExt};
    use futures::sink::SinkExt;
    use futures::stream::StreamExt;
    use futures::task::SpawnExt;
    use hex_literal::hex;
    use std::time::Duration;
    use tor_basic_utils::test_rng::testing_rng;
    use tor_cell::chancell::{msg as chanmsg, AnyChanCell, BoxedCellBody};
    use tor_cell::relaycell::extend::NtorV3Extension;
    use tor_cell::relaycell::{msg as relaymsg, AnyRelayMsgOuter, StreamId};
    use tor_linkspec::OwnedCircTarget;
    use tor_rtcompat::{Runtime, SleepProvider};
    use tracing::trace;
    fn rmsg_to_ccmsg(id: Option<StreamId>, msg: relaymsg::AnyRelayMsg) -> ClientCircChanMsg {
        let body: BoxedCellBody = AnyRelayMsgOuter::new(id, msg)
            .encode(&mut testing_rng())
            .unwrap();
        let chanmsg = chanmsg::Relay::from(body);
        ClientCircChanMsg::Relay(chanmsg)
    }
    // Example relay IDs and keys
    const EXAMPLE_SK: [u8; 32] =
        hex!("7789d92a89711a7e2874c61ea495452cfd48627b3ca2ea9546aafa5bf7b55803");
    const EXAMPLE_PK: [u8; 32] =
        hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253");
    const EXAMPLE_ED_ID: [u8; 32] = [6; 32];
    const EXAMPLE_RSA_ID: [u8; 20] = [10; 20];
    /// return an example OwnedCircTarget that can get used for an ntor handshake.
    fn example_target() -> OwnedCircTarget {
        let mut builder = OwnedCircTarget::builder();
        builder
            .chan_target()
            .ed_identity(EXAMPLE_ED_ID.into())
            .rsa_identity(EXAMPLE_RSA_ID.into());
        builder
            .ntor_onion_key(EXAMPLE_PK.into())
            .protocols("FlowCtrl=1".parse().unwrap())
            .build()
            .unwrap()
    }
    fn example_ntor_key() -> crate::crypto::handshake::ntor::NtorSecretKey {
        crate::crypto::handshake::ntor::NtorSecretKey::new(
            EXAMPLE_SK.into(),
            EXAMPLE_PK.into(),
            EXAMPLE_RSA_ID.into(),
        )
    }
    #[cfg(feature = "ntor_v3")]
    fn example_ntor_v3_key() -> crate::crypto::handshake::ntor_v3::NtorV3SecretKey {
        crate::crypto::handshake::ntor_v3::NtorV3SecretKey::new(
            EXAMPLE_SK.into(),
            EXAMPLE_PK.into(),
            EXAMPLE_ED_ID.into(),
        )
    }
    fn working_fake_channel<R: Runtime>(
        rt: &R,
    ) -> (
        Channel,
        Receiver<AnyChanCell>,
        Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
    ) {
        let (channel, chan_reactor, rx, tx) = new_reactor(rt.clone());
        rt.spawn(async {
            let _ignore = chan_reactor.run().await;
        })
        .unwrap();
        (channel, rx, tx)
    }
    /// Which handshake type to use.
    #[derive(Copy, Clone)]
    enum HandshakeType {
        Fast,
        Ntor,
        #[cfg(feature = "ntor_v3")]
        NtorV3,
    }
    async fn test_create<R: Runtime>(rt: &R, handshake_type: HandshakeType) {
        // We want to try progressing from a pending circuit to a circuit
        // via a crate_fast handshake.
        use crate::crypto::handshake::{fast::CreateFastServer, ntor::NtorServer, ServerHandshake};
        let (chan, mut rx, _sink) = working_fake_channel(rt);
        let circid = CircId::new(128).unwrap();
        let (created_send, created_recv) = oneshot::channel();
        let (_circmsg_send, circmsg_recv) = mpsc::channel(64);
        let unique_id = UniqId::new(23, 17);
        let (pending, reactor) =
            PendingClientCirc::new(circid, chan, created_recv, circmsg_recv, unique_id);
        rt.spawn(async {
            let _ignore = reactor.run().await;
        })
        .unwrap();
        // Future to pretend to be a relay on the other end of the circuit.
        let simulate_relay_fut = async move {
            let mut rng = testing_rng();
            let create_cell = rx.next().await.unwrap();
            assert_eq!(create_cell.circid(), CircId::new(128));
            let reply = match handshake_type {
                HandshakeType::Fast => {
                    let cf = match create_cell.msg() {
                        AnyChanMsg::CreateFast(cf) => cf,
                        _ => panic!(),
                    };
                    let (_, rep) = CreateFastServer::server(
                        &mut rng,
                        &mut |_: &()| Some(()),
                        &[()],
                        cf.handshake(),
                    )
                    .unwrap();
                    CreateResponse::CreatedFast(CreatedFast::new(rep))
                }
                HandshakeType::Ntor => {
                    let c2 = match create_cell.msg() {
                        AnyChanMsg::Create2(c2) => c2,
                        _ => panic!(),
                    };
                    let (_, rep) = NtorServer::server(
                        &mut rng,
                        &mut |_: &()| Some(()),
                        &[example_ntor_key()],
                        c2.body(),
                    )
                    .unwrap();
                    CreateResponse::Created2(Created2::new(rep))
                }
                #[cfg(feature = "ntor_v3")]
                HandshakeType::NtorV3 => {
                    let c2 = match create_cell.msg() {
                        AnyChanMsg::Create2(c2) => c2,
                        _ => panic!(),
                    };
                    let (_, rep) = NtorV3Server::server(
                        &mut rng,
                        &mut |_: &_| Some(vec![]),
                        &[example_ntor_v3_key()],
                        c2.body(),
                    )
                    .unwrap();
                    CreateResponse::Created2(Created2::new(rep))
                }
            };
            created_send.send(reply).unwrap();
        };
        // Future to pretend to be a client.
        let client_fut = async move {
            let target = example_target();
            let params = CircParameters::default();
            let ret = match handshake_type {
                HandshakeType::Fast => {
                    trace!("doing fast create");
                    pending.create_firsthop_fast(&params).await
                }
                HandshakeType::Ntor => {
                    trace!("doing ntor create");
                    pending.create_firsthop_ntor(&target, params).await
                }
                #[cfg(feature = "ntor_v3")]
                HandshakeType::NtorV3 => {
                    trace!("doing ntor_v3 create");
                    pending.create_firsthop_ntor_v3(&target, params).await
                }
            };
            trace!("create done: result {:?}", ret);
            ret
        };
        let (circ, _) = futures::join!(client_fut, simulate_relay_fut);
        let _circ = circ.unwrap();
        // pfew!  We've build a circuit!  Let's make sure it has one hop.
        /* TODO: reinstate this.
        let inner = Arc::get_mut(&mut circuit).unwrap().c.into_inner();
        assert_eq!(inner.hops.len(), 1);
         */
    }
    #[test]
    fn test_create_fast() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_create(&rt, HandshakeType::Fast).await;
        });
    }
    #[test]
    fn test_create_ntor() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_create(&rt, HandshakeType::Ntor).await;
        });
    }
    #[cfg(feature = "ntor_v3")]
    #[test]
    fn test_create_ntor_v3() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_create(&rt, HandshakeType::NtorV3).await;
        });
    }
    // An encryption layer that doesn't do any crypto.   Can be used
    // as inbound or outbound, but not both at once.
    pub(crate) struct DummyCrypto {
        counter_tag: [u8; 20],
        counter: u32,
        lasthop: bool,
    }
    impl DummyCrypto {
        fn next_tag(&mut self) -> &[u8; 20] {
            #![allow(clippy::identity_op)]
            self.counter_tag[0] = ((self.counter >> 0) & 255) as u8;
            self.counter_tag[1] = ((self.counter >> 8) & 255) as u8;
            self.counter_tag[2] = ((self.counter >> 16) & 255) as u8;