Skip to main content

tor_proto/
channel.rs

1//! Code for talking directly (over a TLS connection) to a Tor client or relay.
2//!
3//! Channels form the basis of the rest of the Tor protocol: they are
4//! the only way for two Tor instances to talk.
5//!
6//! Channels are not useful directly for application requests: after
7//! making a channel, it needs to get used to build circuits, and the
8//! circuits are used to anonymize streams.  The streams are the
9//! objects corresponding to directory requests.
10//!
11//! In general, you shouldn't try to manage channels on your own;
12//! use the `tor-chanmgr` crate instead.
13//!
14//! To launch a channel:
15//!
16//!  * Create a TLS connection as an object that implements AsyncRead +
17//!    AsyncWrite + StreamOps, and pass it to a [ChannelBuilder].  This will
18//!    yield an [crate::client::channel::handshake::ClientInitiatorHandshake] that represents
19//!    the state of the handshake.
20//!  * Call [crate::client::channel::handshake::ClientInitiatorHandshake::connect] on the result
21//!    to negotiate the rest of the handshake.  This will verify
22//!    syntactic correctness of the handshake, but not its cryptographic
23//!    integrity.
24//!  * Call handshake::UnverifiedChannel::check on the result.  This
25//!    finishes the cryptographic checks.
26//!  * Call handshake::VerifiedChannel::finish on the result. This
27//!    completes the handshake and produces an open channel and Reactor.
28//!  * Launch an asynchronous task to call the reactor's run() method.
29//!
30//! One you have a running channel, you can create circuits on it with
31//! its [Channel::new_tunnel] method.  See
32//! [crate::client::circuit::PendingClientTunnel] for information on how to
33//! proceed from there.
34//!
35//! # Design
36//!
37//! For now, this code splits the channel into two pieces: a "Channel"
38//! object that can be used by circuits to write cells onto the
39//! channel, and a "Reactor" object that runs as a task in the
40//! background, to read channel cells and pass them to circuits as
41//! appropriate.
42//!
43//! I'm not at all sure that's the best way to do that, but it's what
44//! I could think of.
45//!
46//! # Limitations
47//!
48//! TODO: There is no rate limiting or fairness.
49
50/// The size of the channel buffer for communication between `Channel` and its reactor.
51pub const CHANNEL_BUFFER_SIZE: usize = 128;
52
53mod circmap;
54mod handler;
55pub(crate) mod handshake;
56pub mod kist;
57mod msg;
58pub mod padding;
59pub mod params;
60mod reactor;
61mod unique_id;
62
63pub use crate::channel::params::*;
64pub(crate) use crate::channel::reactor::Reactor;
65use crate::channel::reactor::{BoxedChannelSink, BoxedChannelStream};
66pub use crate::channel::unique_id::UniqId;
67use crate::client::channel::ClientChannelBuilder;
68use crate::client::circuit::PendingClientTunnel;
69use crate::client::circuit::padding::{PaddingController, QueuedCellPaddingInfo};
70use crate::memquota::{ChannelAccount, CircuitAccount, SpecificAccount as _};
71use crate::util::err::ChannelClosed;
72use crate::util::oneshot_broadcast;
73use crate::util::timeout::TimeoutEstimator;
74use crate::util::ts::AtomicOptTimestamp;
75use crate::{ClockSkew, client};
76use crate::{Error, Result};
77use cfg_if::cfg_if;
78use reactor::BoxedChannelStreamOps;
79use safelog::sensitive as sv;
80use std::future::{Future, IntoFuture};
81use std::net::IpAddr;
82use std::pin::Pin;
83use std::sync::{Mutex, MutexGuard};
84use std::time::Duration;
85use tor_cell::chancell::ChanMsg;
86use tor_cell::chancell::msg::AnyChanMsg;
87use tor_cell::chancell::{AnyChanCell, CircId, msg::Netinfo, msg::PaddingNegotiate};
88use tor_error::internal;
89use tor_linkspec::{HasRelayIds, OwnedChanTarget};
90use tor_memquota::mq_queue::{self, ChannelSpec as _, MpscSpec};
91use tor_rtcompat::{CoarseTimeProvider, DynTimeProvider, SleepProvider, StreamOps};
92
93#[cfg(feature = "circ-padding")]
94use tor_async_utils::counting_streams::{self, CountingSink, CountingStream};
95
96#[cfg(feature = "relay")]
97use crate::circuit::CircuitRxReceiver;
98
99/// Imports that are re-exported pub if feature `testing` is enabled
100///
101/// Putting them together in a little module like this allows us to select the
102/// visibility for all of these things together.
103mod testing_exports {
104    #![allow(unreachable_pub)]
105    pub use super::reactor::CtrlMsg;
106    pub use crate::circuit::celltypes::CreateResponse;
107}
108#[cfg(feature = "testing")]
109pub use testing_exports::*;
110#[cfg(not(feature = "testing"))]
111use testing_exports::*;
112
113use asynchronous_codec;
114use futures::channel::mpsc;
115use futures::io::{AsyncRead, AsyncWrite};
116use oneshot_fused_workaround as oneshot;
117
118use educe::Educe;
119use futures::{FutureExt as _, Sink};
120use std::result::Result as StdResult;
121use std::sync::Arc;
122use std::task::{Context, Poll};
123
124use tracing::{instrument, trace};
125
126// reexport
127pub use super::client::channel::handshake::ClientInitiatorHandshake;
128#[cfg(feature = "relay")]
129pub use super::relay::channel::handshake::RelayInitiatorHandshake;
130use crate::channel::unique_id::CircUniqIdContext;
131
132use kist::KistParams;
133
134/// This indicate what type of channel it is. It allows us to decide for the correct channel cell
135/// state machines and authentication process (if any).
136///
137/// It is created when a channel is requested for creation which means the subsystem wanting to
138/// open a channel needs to know what type it wants.
139#[derive(Clone, Copy, Debug, derive_more::Display)]
140#[non_exhaustive]
141pub enum ChannelType {
142    /// Client: Initiated from a client to a relay. Client is unauthenticated and relay is
143    /// authenticated.
144    ClientInitiator,
145    /// Relay: Initiating as a relay to a relay. Both sides are authenticated.
146    RelayInitiator,
147    /// Relay: Responding as a relay to a relay or client. Authenticated or Unauthenticated.
148    RelayResponder {
149        /// Indicate if the channel is authenticated. Responding as a relay can be either from a
150        /// Relay (authenticated) or a Client/Bridge (Unauthenticated). We only know this
151        /// information once the handshake is completed.
152        ///
153        /// This side is always authenticated, the other side can be if a relay or not if
154        /// bridge/client. This is set to false unless we end up authenticating the other side
155        /// meaning a relay.
156        authenticated: bool,
157    },
158}
159
160impl ChannelType {
161    /// Set that this channel type is now authenticated. This only applies to RelayResponder.
162    pub(crate) fn set_authenticated(&mut self) {
163        if let Self::RelayResponder { authenticated } = self {
164            *authenticated = true;
165        }
166    }
167}
168
169/// A channel cell frame used for sending and receiving cells on a channel. The handler takes care
170/// of the cell codec transition depending in which state the channel is.
171///
172/// ChannelFrame is used to basically handle all in and outbound cells on a channel for its entire
173/// lifetime.
174pub(crate) type ChannelFrame<T> = asynchronous_codec::Framed<T, handler::ChannelCellHandler>;
175
176/// An entry in a channel's queue of cells to be flushed.
177pub(crate) type ChanCellQueueEntry = (AnyChanCell, Option<QueuedCellPaddingInfo>);
178
179/// Helper: Return a new channel frame [ChannelFrame] from an object implementing AsyncRead + AsyncWrite. In the
180/// tor context, it is always a TLS stream.
181///
182/// The ty (type) argument needs to be able to transform into a [handler::ChannelCellHandler] which would
183/// generally be a [ChannelType].
184pub(crate) fn new_frame<T, I>(tls: T, ty: I) -> ChannelFrame<T>
185where
186    T: AsyncRead + AsyncWrite,
187    I: Into<handler::ChannelCellHandler>,
188{
189    let mut framed = asynchronous_codec::Framed::new(tls, ty.into());
190    framed.set_send_high_water_mark(32 * 1024);
191    framed
192}
193
194/// Canonical state between this channel and its peer. This is inferred from the [`Netinfo`]
195/// received during the channel handshake.
196///
197/// A connection is "canonical" if the TCP connection's peer IP address matches an address
198/// that the relay itself claims in its [`Netinfo`] cell.
199#[derive(Debug)]
200pub(crate) struct Canonicity {
201    /// The peer has proven this connection is canonical for its address: at least one NETINFO "my
202    /// address" matches the observed TCP peer address.
203    pub(crate) peer_is_canonical: bool,
204    /// We appear canonical from the peer's perspective: its NETINFO "other address" matches our
205    /// advertised relay address.
206    pub(crate) canonical_to_peer: bool,
207}
208
209impl Canonicity {
210    /// Using a [`Netinfo`], build the canonicity object with the given addresses.
211    ///
212    /// The `my_addrs` are the advertised address of this relay or empty if a client/bridge as they
213    /// do not advertise or expose a reachable address.
214    ///
215    /// The `peer_addr` is the IP address we believe the peer has. In other words, it is either the
216    /// IP we used to connect to or the address we see in the accept() phase of the connection.
217    pub(crate) fn from_netinfo(netinfo: &Netinfo, my_addrs: &[IpAddr], peer_addr: IpAddr) -> Self {
218        Self {
219            // The "other addr" (our address as seen by the peer) matches the one we advertised.
220            canonical_to_peer: netinfo
221                .their_addr()
222                .is_some_and(|a: &IpAddr| my_addrs.contains(a)),
223            // The "my addresses" (the peer addresses that it claims to have) matches the one we
224            // see on the connection or that we attempted to connect to.
225            peer_is_canonical: netinfo.my_addrs().contains(&peer_addr),
226        }
227    }
228
229    /// Construct a fully canonical object.
230    #[cfg(any(test, feature = "testing"))]
231    pub(crate) fn new_canonical() -> Self {
232        Self {
233            peer_is_canonical: true,
234            canonical_to_peer: true,
235        }
236    }
237}
238
239/// An open client channel, ready to send and receive Tor cells.
240///
241/// A channel is a direct connection to a Tor relay, implemented using TLS.
242///
243/// This struct is a frontend that can be used to send cells
244/// and otherwise control the channel.  The main state is
245/// in the Reactor object.
246///
247/// (Users need a mutable reference because of the types in `Sink`, and
248/// ultimately because `cell_tx: mpsc::Sender` doesn't work without mut.
249///
250/// # Channel life cycle
251///
252/// Channels can be created directly here through the [`ChannelBuilder`] API.
253/// For a higher-level API (with better support for TLS, pluggable transports,
254/// and channel reuse) see the `tor-chanmgr` crate.
255///
256/// After a channel is created, it will persist until it is closed in one of
257/// four ways:
258///    1. A remote error occurs.
259///    2. The other side of the channel closes the channel.
260///    3. Someone calls [`Channel::terminate`] on the channel.
261///    4. The last reference to the `Channel` is dropped. (Note that every circuit
262///       on a `Channel` keeps a reference to it, which will in turn keep the
263///       channel from closing until all those circuits have gone away.)
264///
265/// Note that in cases 1-3, the [`Channel`] object itself will still exist: it
266/// will just be unusable for most purposes.  Most operations on it will fail
267/// with an error.
268#[derive(Debug)]
269pub struct Channel {
270    /// The channel type.
271    #[expect(unused)] // TODO: Remove once used.
272    channel_type: ChannelType,
273    /// A channel used to send control messages to the Reactor.
274    control: mpsc::UnboundedSender<CtrlMsg>,
275    /// A channel used to send cells to the Reactor.
276    cell_tx: CellTx,
277
278    /// A receiver that indicates whether the channel is closed.
279    ///
280    /// Awaiting will return a `CancelledError` event when the reactor is dropped.
281    /// Read to decide if operations may succeed, and is returned by `wait_for_close`.
282    reactor_closed_rx: oneshot_broadcast::Receiver<Result<CloseInfo>>,
283
284    /// Padding controller, used to report when data is queued for this channel.
285    padding_ctrl: PaddingController,
286
287    /// A unique identifier for this channel.
288    unique_id: UniqId,
289    /// Validated identity and address information for this peer.
290    peer_id: OwnedChanTarget,
291    /// The declared clock skew on this channel, at the time when this channel was
292    /// created.
293    clock_skew: ClockSkew,
294    /// The time when this channel was successfully completed
295    opened_at: coarsetime::Instant,
296    /// Mutable state used by the `Channel.
297    mutable: Mutex<MutableDetails>,
298    /// Information shared with the reactor
299    details: Arc<ChannelDetails>,
300    /// Canonicity of this channel.
301    canonicity: Canonicity,
302}
303
304/// This is information shared between the reactor and the frontend (`Channel` object).
305///
306/// `control` can't be here because we rely on it getting dropped when the last user goes away.
307#[derive(Debug)]
308pub(crate) struct ChannelDetails {
309    /// Since when the channel became unused.
310    ///
311    /// If calling `time_since_update` returns None,
312    /// this channel is still in use by at least one circuit.
313    ///
314    /// Set by reactor when a circuit is added or removed.
315    /// Read from `Channel::duration_unused`.
316    unused_since: AtomicOptTimestamp,
317    /// Memory quota account
318    ///
319    /// This is here partly because we need to ensure it lives as long as the channel,
320    /// as otherwise the memquota system will tear the account down.
321    #[allow(dead_code)]
322    memquota: ChannelAccount,
323}
324
325/// Mutable details (state) used by the `Channel` (frontend)
326#[derive(Debug, Default)]
327struct MutableDetails {
328    /// State used to control padding
329    padding: PaddingControlState,
330}
331
332/// State used to control padding
333///
334/// We store this here because:
335///
336///  1. It must be per-channel, because it depends on channel usage.  So it can't be in
337///     (for example) `ChannelPaddingInstructionsUpdate`.
338///
339///  2. It could be in the channel manager's per-channel state but (for code flow reasons
340///     there, really) at the point at which the channel manager concludes for a pending
341///     channel that it ought to update the usage, it has relinquished the lock on its own data
342///     structure.
343///     And there is actually no need for this to be global: a per-channel lock is better than
344///     reacquiring the global one.
345///
346///  3. It doesn't want to be in the channel reactor since that's super hot.
347///
348/// See also the overview at [`tor_proto::channel::padding`](padding)
349#[derive(Debug, Educe)]
350#[educe(Default)]
351enum PaddingControlState {
352    /// No usage of this channel, so far, implies sending or negotiating channel padding.
353    ///
354    /// This means we do not send (have not sent) any `ChannelPaddingInstructionsUpdates` to the reactor,
355    /// with the following consequences:
356    ///
357    ///  * We don't enable our own padding.
358    ///  * We don't do any work to change the timeout distribution in the padding timer,
359    ///    (which is fine since this timer is not enabled).
360    ///  * We don't send any PADDING_NEGOTIATE cells.  The peer is supposed to come to the
361    ///    same conclusions as us, based on channel usage: it should also not send padding.
362    #[educe(Default)]
363    UsageDoesNotImplyPadding {
364        /// The last padding parameters (from reparameterize)
365        ///
366        /// We keep this so that we can send it if and when
367        /// this channel starts to be used in a way that implies (possibly) sending padding.
368        padding_params: ChannelPaddingInstructionsUpdates,
369    },
370
371    /// Some usage of this channel implies possibly sending channel padding
372    ///
373    /// The required padding timer, negotiation cell, etc.,
374    /// have been communicated to the reactor via a `CtrlMsg::ConfigUpdate`.
375    ///
376    /// Once we have set this variant, it remains this way forever for this channel,
377    /// (the spec speaks of channels "only used for" certain purposes not getting padding).
378    PaddingConfigured,
379}
380
381use PaddingControlState as PCS;
382
383cfg_if! {
384    if #[cfg(feature="circ-padding")] {
385        /// Implementation type for a ChannelSender.
386        type CellTx = CountingSink<mq_queue::Sender<ChanCellQueueEntry, mq_queue::MpscSpec>>;
387
388        /// Implementation type for a cell queue held by a reactor.
389        type CellRx = CountingStream<mq_queue::Receiver<ChanCellQueueEntry, mq_queue::MpscSpec>>;
390    } else {
391        /// Implementation type for a ChannelSender.
392        type CellTx = mq_queue::Sender<ChanCellQueueEntry, mq_queue::MpscSpec>;
393
394        /// Implementation type for a cell queue held by a reactor.
395        type CellRx = mq_queue::Receiver<ChanCellQueueEntry, mq_queue::MpscSpec>;
396    }
397}
398
399/// A handle to a [`Channel`]` that can be used, by circuits, to send channel cells.
400#[derive(Debug)]
401pub(crate) struct ChannelSender {
402    /// MPSC sender to send cells.
403    cell_tx: CellTx,
404    /// A receiver used to check if the channel is closed.
405    reactor_closed_rx: oneshot_broadcast::Receiver<Result<CloseInfo>>,
406    /// Unique ID for this channel. For logging.
407    unique_id: UniqId,
408    /// Padding controller for this channel:
409    /// used to report when we queue data that will eventually wind up on the channel.
410    padding_ctrl: PaddingController,
411}
412
413impl ChannelSender {
414    /// Check whether a cell type is permissible to be _sent_ on an
415    /// open client channel.
416    fn check_cell(&self, cell: &AnyChanCell) -> Result<()> {
417        use tor_cell::chancell::msg::AnyChanMsg::*;
418        let msg = cell.msg();
419        match msg {
420            Created(_) | Created2(_) | CreatedFast(_) => Err(Error::from(internal!(
421                "Can't send {} cell on client channel",
422                msg.cmd()
423            ))),
424            Certs(_) | Versions(_) | Authenticate(_) | AuthChallenge(_) | Netinfo(_) => {
425                Err(Error::from(internal!(
426                    "Can't send {} cell after handshake is done",
427                    msg.cmd()
428                )))
429            }
430            _ => Ok(()),
431        }
432    }
433
434    /// Obtain a reference to the `ChannelSender`'s [`DynTimeProvider`]
435    ///
436    /// (This can sometimes be used to avoid having to keep
437    /// a separate clone of the time provider.)
438    pub(crate) fn time_provider(&self) -> &DynTimeProvider {
439        cfg_if! {
440            if #[cfg(feature="circ-padding")] {
441                self.cell_tx.inner().time_provider()
442            } else {
443                self.cell_tx.time_provider()
444            }
445        }
446    }
447
448    /// Return an approximate count of the number of outbound cells queued for this channel.
449    ///
450    /// This count is necessarily approximate,
451    /// because the underlying count can be modified by other senders and receivers
452    /// between when this method is called and when its return value is used.
453    ///
454    /// Does not include cells that have already been passed to the TLS connection.
455    ///
456    /// Circuit padding uses this count to determine
457    /// when messages are already outbound for the first hop of a circuit.
458    #[cfg(feature = "circ-padding")]
459    pub(crate) fn approx_count(&self) -> usize {
460        self.cell_tx.approx_count()
461    }
462
463    /// Note that a cell has been queued that will eventually be placed onto this sender.
464    ///
465    /// We use this as an input for padding machines.
466    pub(crate) fn note_cell_queued(&self) {
467        self.padding_ctrl.queued_data(crate::HopNum::from(0));
468    }
469}
470
471impl Sink<ChanCellQueueEntry> for ChannelSender {
472    type Error = Error;
473
474    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
475        let this = self.get_mut();
476        Pin::new(&mut this.cell_tx)
477            .poll_ready(cx)
478            .map_err(|_| ChannelClosed.into())
479    }
480
481    fn start_send(self: Pin<&mut Self>, cell: ChanCellQueueEntry) -> Result<()> {
482        let this = self.get_mut();
483        if this.reactor_closed_rx.is_ready() {
484            return Err(ChannelClosed.into());
485        }
486        this.check_cell(&cell.0)?;
487        {
488            use tor_cell::chancell::msg::AnyChanMsg::*;
489            match cell.0.msg() {
490                Relay(_) | Padding(_) | Vpadding(_) => {} // too frequent to log.
491                _ => trace!(
492                    channel_id = %this.unique_id,
493                    "Sending {} for {}",
494                    cell.0.msg().cmd(),
495                    CircId::get_or_zero(cell.0.circid())
496                ),
497            }
498        }
499
500        Pin::new(&mut this.cell_tx)
501            .start_send(cell)
502            .map_err(|_| ChannelClosed.into())
503    }
504
505    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
506        let this = self.get_mut();
507        Pin::new(&mut this.cell_tx)
508            .poll_flush(cx)
509            .map_err(|_| ChannelClosed.into())
510    }
511
512    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
513        let this = self.get_mut();
514        Pin::new(&mut this.cell_tx)
515            .poll_close(cx)
516            .map_err(|_| ChannelClosed.into())
517    }
518}
519
520/// Structure for building and launching a Tor channel.
521//
522// TODO(relay): Remove this as we now have ClientChannelBuilder and soon RelayChannelBuilder.
523#[derive(Default)]
524pub struct ChannelBuilder {
525    /// If present, a description of the address we're trying to connect to,
526    /// and the way in which we are trying to connect to it.
527    ///
528    /// TODO: at some point, check this against the addresses in the netinfo
529    /// cell too.
530    target: Option<tor_linkspec::ChannelMethod>,
531}
532
533impl ChannelBuilder {
534    /// Construct a new ChannelBuilder.
535    pub fn new() -> Self {
536        ChannelBuilder::default()
537    }
538
539    /// Set the declared target method of this channel to correspond to a direct
540    /// connection to a given socket address.
541    #[deprecated(note = "use set_declared_method instead", since = "0.7.1")]
542    pub fn set_declared_addr(&mut self, target: std::net::SocketAddr) {
543        self.set_declared_method(tor_linkspec::ChannelMethod::Direct(vec![target]));
544    }
545
546    /// Set the declared target method of this channel.
547    ///
548    /// Note that nothing enforces the correctness of this method: it
549    /// doesn't have to match the real method used to create the TLS
550    /// stream.
551    pub fn set_declared_method(&mut self, target: tor_linkspec::ChannelMethod) {
552        self.target = Some(target);
553    }
554
555    /// Launch a new client handshake over a TLS stream.
556    ///
557    /// After calling this function, you'll need to call `connect()` on
558    /// the result to start the handshake.  If that succeeds, you'll have
559    /// authentication info from the relay: call `check()` on the result
560    /// to check that.  Finally, to finish the handshake, call `finish()`
561    /// on the result of _that_.
562    pub fn launch_client<T, S>(
563        self,
564        tls: T,
565        sleep_prov: S,
566        memquota: ChannelAccount,
567    ) -> ClientInitiatorHandshake<T, S>
568    where
569        T: AsyncRead + AsyncWrite + StreamOps + Send + Unpin + 'static,
570        S: CoarseTimeProvider + SleepProvider,
571    {
572        // TODO(relay): We could just make the target be taken as a parameter instead of using a
573        // setter that is also replicated on the client builder? Food for thought on refactor here.
574        let mut builder = ClientChannelBuilder::new();
575        if let Some(target) = self.target {
576            builder.set_declared_method(target);
577        }
578        builder.launch(tls, sleep_prov, memquota)
579    }
580}
581
582impl Channel {
583    /// Construct a channel and reactor.
584    ///
585    /// Internal method, called to finalize the channel when we've
586    /// sent our netinfo cell, received the peer's netinfo cell, and
587    /// we're finally ready to create circuits.
588    ///
589    /// Quick note on the allow clippy. This is has one call site so for now, it is fine that we
590    /// bust the mighty 7 arguments.
591    #[allow(clippy::too_many_arguments)] // TODO consider if we want a builder
592    fn new<S>(
593        channel_type: ChannelType,
594        link_protocol: u16,
595        sink: BoxedChannelSink,
596        stream: BoxedChannelStream,
597        streamops: BoxedChannelStreamOps,
598        unique_id: UniqId,
599        peer_id: OwnedChanTarget,
600        clock_skew: ClockSkew,
601        sleep_prov: S,
602        memquota: ChannelAccount,
603        canonicity: Canonicity,
604    ) -> Result<(Arc<Self>, reactor::Reactor<S>)>
605    where
606        S: CoarseTimeProvider + SleepProvider,
607    {
608        use circmap::{CircIdRange, CircMap};
609        let circmap = CircMap::new(CircIdRange::High);
610        let dyn_time = DynTimeProvider::new(sleep_prov.clone());
611
612        let (control_tx, control_rx) = mpsc::unbounded();
613        let (cell_tx, cell_rx) = mq_queue::MpscSpec::new(CHANNEL_BUFFER_SIZE)
614            .new_mq(dyn_time.clone(), memquota.as_raw_account())?;
615        #[cfg(feature = "circ-padding")]
616        let (cell_tx, cell_rx) = counting_streams::channel(cell_tx, cell_rx);
617        let unused_since = AtomicOptTimestamp::new();
618        unused_since.update();
619
620        let mutable = MutableDetails::default();
621        let (reactor_closed_tx, reactor_closed_rx) = oneshot_broadcast::channel();
622
623        let details = ChannelDetails {
624            unused_since,
625            memquota,
626        };
627        let details = Arc::new(details);
628
629        // We might be using experimental maybenot padding; this creates the padding framework for that.
630        //
631        // TODO: This backend is currently optimized for circuit padding,
632        // so it might allocate a bit more than necessary to account for multiple hops.
633        // We should tune it when we deploy padding in production.
634        let (padding_ctrl, padding_event_stream) =
635            client::circuit::padding::new_padding(DynTimeProvider::new(sleep_prov.clone()));
636
637        let channel = Arc::new(Channel {
638            channel_type,
639            control: control_tx,
640            cell_tx,
641            reactor_closed_rx,
642            padding_ctrl: padding_ctrl.clone(),
643            unique_id,
644            peer_id,
645            clock_skew,
646            opened_at: coarsetime::Instant::now(),
647            mutable: Mutex::new(mutable),
648            details: Arc::clone(&details),
649            canonicity,
650        });
651
652        // We start disabled; the channel manager will `reconfigure` us soon after creation.
653        let padding_timer = Box::pin(padding::Timer::new_disabled(sleep_prov.clone(), None)?);
654
655        cfg_if! {
656            if #[cfg(feature = "circ-padding")] {
657                use crate::util::sink_blocker::{SinkBlocker,CountingPolicy};
658                let sink = SinkBlocker::new(sink, CountingPolicy::new_unlimited());
659            }
660        }
661
662        let reactor = Reactor {
663            runtime: sleep_prov,
664            control: control_rx,
665            cells: cell_rx,
666            reactor_closed_tx,
667            input: futures::StreamExt::fuse(stream),
668            output: sink,
669            streamops,
670            circs: circmap,
671            circ_unique_id_ctx: CircUniqIdContext::new(),
672            link_protocol,
673            unique_id,
674            details,
675            padding_timer,
676            padding_ctrl,
677            padding_event_stream,
678            padding_blocker: None,
679            special_outgoing: Default::default(),
680        };
681
682        Ok((channel, reactor))
683    }
684
685    /// Return a process-unique identifier for this channel.
686    pub fn unique_id(&self) -> UniqId {
687        self.unique_id
688    }
689
690    /// Return a reference to the memory tracking account for this Channel
691    pub fn mq_account(&self) -> &ChannelAccount {
692        &self.details.memquota
693    }
694
695    /// Obtain a reference to the `Channel`'s [`DynTimeProvider`]
696    ///
697    /// (This can sometimes be used to avoid having to keep
698    /// a separate clone of the time provider.)
699    pub fn time_provider(&self) -> &DynTimeProvider {
700        cfg_if! {
701            if #[cfg(feature="circ-padding")] {
702                self.cell_tx.inner().time_provider()
703            } else {
704                self.cell_tx.time_provider()
705            }
706        }
707    }
708
709    /// Return an OwnedChanTarget representing the actual handshake used to
710    /// create this channel.
711    pub fn target(&self) -> &OwnedChanTarget {
712        &self.peer_id
713    }
714
715    /// Return the amount of time that has passed since this channel became open.
716    pub fn age(&self) -> Duration {
717        self.opened_at.elapsed().into()
718    }
719
720    /// Return a ClockSkew declaring how much clock skew the other side of this channel
721    /// claimed that we had when we negotiated the connection.
722    pub fn clock_skew(&self) -> ClockSkew {
723        self.clock_skew
724    }
725
726    /// Send a control message
727    #[instrument(level = "trace", skip_all)]
728    fn send_control(&self, msg: CtrlMsg) -> StdResult<(), ChannelClosed> {
729        self.control
730            .unbounded_send(msg)
731            .map_err(|_| ChannelClosed)?;
732        Ok(())
733    }
734
735    /// Acquire the lock on `mutable` (and handle any poison error)
736    fn mutable(&self) -> MutexGuard<MutableDetails> {
737        self.mutable.lock().expect("channel details poisoned")
738    }
739
740    /// Specify that this channel should do activities related to channel padding
741    ///
742    /// Initially, the channel does nothing related to channel padding:
743    /// it neither sends any padding, nor sends any PADDING_NEGOTIATE cells.
744    ///
745    /// After this function has been called, it will do both,
746    /// according to the parameters specified through `reparameterize`.
747    /// Note that this might include *disabling* padding
748    /// (for example, by sending a `PADDING_NEGOTIATE`).
749    ///
750    /// Idempotent.
751    ///
752    /// There is no way to undo the effect of this call.
753    #[instrument(level = "trace", skip_all)]
754    pub fn engage_padding_activities(&self) {
755        let mut mutable = self.mutable();
756
757        match &mutable.padding {
758            PCS::UsageDoesNotImplyPadding {
759                padding_params: params,
760            } => {
761                // Well, apparently the channel usage *does* imply padding now,
762                // so we need to (belatedly) enable the timer,
763                // send the padding negotiation cell, etc.
764                let mut params = params.clone();
765
766                // Except, maybe the padding we would be requesting is precisely default,
767                // so we wouldn't actually want to send that cell.
768                if params.padding_negotiate == Some(PaddingNegotiate::start_default()) {
769                    params.padding_negotiate = None;
770                }
771
772                match self.send_control(CtrlMsg::ConfigUpdate(Arc::new(params))) {
773                    Ok(()) => {}
774                    Err(ChannelClosed) => return,
775                }
776
777                mutable.padding = PCS::PaddingConfigured;
778            }
779
780            PCS::PaddingConfigured => {
781                // OK, nothing to do
782            }
783        }
784
785        drop(mutable); // release the lock now: lock span covers the send, ensuring ordering
786    }
787
788    /// Reparameterise (update parameters; reconfigure)
789    ///
790    /// Returns `Err` if the channel was closed earlier
791    #[instrument(level = "trace", skip_all)]
792    pub fn reparameterize(&self, params: Arc<ChannelPaddingInstructionsUpdates>) -> Result<()> {
793        let mut mutable = self
794            .mutable
795            .lock()
796            .map_err(|_| internal!("channel details poisoned"))?;
797
798        match &mut mutable.padding {
799            PCS::PaddingConfigured => {
800                self.send_control(CtrlMsg::ConfigUpdate(params))?;
801            }
802            PCS::UsageDoesNotImplyPadding { padding_params } => {
803                padding_params.combine(&params);
804            }
805        }
806
807        drop(mutable); // release the lock now: lock span covers the send, ensuring ordering
808        Ok(())
809    }
810
811    /// Update the KIST parameters.
812    ///
813    /// Returns `Err` if the channel is closed.
814    #[instrument(level = "trace", skip_all)]
815    pub fn reparameterize_kist(&self, kist_params: KistParams) -> Result<()> {
816        Ok(self.send_control(CtrlMsg::KistConfigUpdate(kist_params))?)
817    }
818
819    /// Return an error if this channel is somehow mismatched with the
820    /// given target.
821    pub fn check_match<T: HasRelayIds + ?Sized>(&self, target: &T) -> Result<()> {
822        check_id_match_helper(&self.peer_id, target)
823    }
824
825    /// Return true if this channel is closed and therefore unusable.
826    pub fn is_closing(&self) -> bool {
827        self.reactor_closed_rx.is_ready()
828    }
829
830    /// Return true iff this channel is considered canonical by us.
831    pub fn is_canonical(&self) -> bool {
832        self.canonicity.peer_is_canonical
833    }
834
835    /// Return true if we think the peer considers this channel as canonical.
836    pub fn is_canonical_to_peer(&self) -> bool {
837        self.canonicity.canonical_to_peer
838    }
839
840    /// If the channel is not in use, return the amount of time
841    /// it has had with no circuits.
842    ///
843    /// Return `None` if the channel is currently in use.
844    pub fn duration_unused(&self) -> Option<std::time::Duration> {
845        self.details
846            .unused_since
847            .time_since_update()
848            .map(Into::into)
849    }
850
851    /// Return a new [`ChannelSender`] to transmit cells on this channel.
852    pub(crate) fn sender(&self) -> ChannelSender {
853        ChannelSender {
854            cell_tx: self.cell_tx.clone(),
855            reactor_closed_rx: self.reactor_closed_rx.clone(),
856            unique_id: self.unique_id,
857            padding_ctrl: self.padding_ctrl.clone(),
858        }
859    }
860
861    /// Return a newly allocated PendingClientTunnel object with
862    /// a corresponding tunnel reactor. A circuit ID is allocated, but no
863    /// messages are sent, and no cryptography is done.
864    ///
865    /// To use the results of this method, call Reactor::run() in a
866    /// new task, then use the methods of
867    /// [crate::client::circuit::PendingClientTunnel] to build the circuit.
868    #[instrument(level = "trace", skip_all)]
869    pub async fn new_tunnel(
870        self: &Arc<Self>,
871        timeouts: Arc<dyn TimeoutEstimator>,
872    ) -> Result<(PendingClientTunnel, client::reactor::Reactor)> {
873        if self.is_closing() {
874            return Err(ChannelClosed.into());
875        }
876
877        let time_prov = self.time_provider().clone();
878        let memquota = CircuitAccount::new(&self.details.memquota)?;
879
880        // TODO: blocking is risky, but so is unbounded.
881        let (sender, receiver) =
882            MpscSpec::new(128).new_mq(time_prov.clone(), memquota.as_raw_account())?;
883        let (createdsender, createdreceiver) = oneshot::channel::<CreateResponse>();
884
885        let (tx, rx) = oneshot::channel();
886        self.send_control(CtrlMsg::AllocateCircuit {
887            created_sender: createdsender,
888            sender,
889            tx,
890        })?;
891        let (id, circ_unique_id, padding_ctrl, padding_stream) =
892            rx.await.map_err(|_| ChannelClosed)??;
893
894        trace!("{}: Allocated CircId {}", circ_unique_id, id);
895
896        Ok(PendingClientTunnel::new(
897            id,
898            self.clone(),
899            createdreceiver,
900            receiver,
901            circ_unique_id,
902            time_prov,
903            memquota,
904            padding_ctrl,
905            padding_stream,
906            timeouts,
907        ))
908    }
909
910    /// Return a newly allocated outbound relay circuit with.
911    ///
912    /// A circuit ID is allocated, but no messages are sent, and no cryptography is done.
913    ///
914    // TODO(relay): this duplicates much of new_tunnel above, but I expect
915    // the implementations to diverge once we introduce a new CtrlMsg for
916    // allocating relay circuits.
917    #[cfg(feature = "relay")]
918    pub(crate) async fn new_outbound_circ(
919        self: &Arc<Self>,
920    ) -> Result<(CircId, CircuitRxReceiver, oneshot::Receiver<CreateResponse>)> {
921        if self.is_closing() {
922            return Err(ChannelClosed.into());
923        }
924
925        let time_prov = self.time_provider().clone();
926        let memquota = CircuitAccount::new(&self.details.memquota)?;
927
928        // TODO: blocking is risky, but so is unbounded.
929        let (sender, receiver) =
930            MpscSpec::new(128).new_mq(time_prov.clone(), memquota.as_raw_account())?;
931        let (createdsender, createdreceiver) = oneshot::channel::<CreateResponse>();
932
933        let (tx, rx) = oneshot::channel();
934
935        self.send_control(CtrlMsg::AllocateCircuit {
936            created_sender: createdsender,
937            sender,
938            tx,
939        })?;
940
941        // TODO(relay): I don't think we need circuit-level padding on this side of the circuit.
942        // This just drops the padding controller and corresponding event stream,
943        // but maybe it would be better to just not set it up in the first place?
944        // This suggests we might need a different control command for allocating
945        // the outbound relay circuits...
946        let (id, circ_unique_id, _padding_ctrl, _padding_stream) =
947            rx.await.map_err(|_| ChannelClosed)??;
948
949        trace!("{}: Allocated CircId {}", circ_unique_id, id);
950
951        Ok((id, receiver, createdreceiver))
952    }
953
954    /// Shut down this channel immediately, along with all circuits that
955    /// are using it.
956    ///
957    /// Note that other references to this channel may exist.  If they
958    /// do, they will stop working after you call this function.
959    ///
960    /// It's not necessary to call this method if you're just done
961    /// with a channel: the channel should close on its own once nothing
962    /// is using it any more.
963    #[instrument(level = "trace", skip_all)]
964    pub fn terminate(&self) {
965        let _ = self.send_control(CtrlMsg::Shutdown);
966    }
967
968    /// Tell the reactor that the circuit with the given ID has gone away.
969    #[instrument(level = "trace", skip_all)]
970    pub fn close_circuit(&self, circid: CircId) -> Result<()> {
971        self.send_control(CtrlMsg::CloseCircuit(circid))?;
972        Ok(())
973    }
974
975    /// Return a future that will resolve once this channel has closed.
976    ///
977    /// Note that this method does not _cause_ the channel to shut down on its own.
978    pub fn wait_for_close(
979        &self,
980    ) -> impl Future<Output = StdResult<CloseInfo, ClosedUnexpectedly>> + Send + Sync + 'static + use<>
981    {
982        self.reactor_closed_rx
983            .clone()
984            .into_future()
985            .map(|recv| match recv {
986                Ok(Ok(info)) => Ok(info),
987                Ok(Err(e)) => Err(ClosedUnexpectedly::ReactorError(e)),
988                Err(oneshot_broadcast::SenderDropped) => Err(ClosedUnexpectedly::ReactorDropped),
989            })
990    }
991
992    /// Install a [`CircuitPadder`](client::CircuitPadder) for this channel.
993    ///
994    /// Replaces any previous padder installed.
995    #[cfg(feature = "circ-padding-manual")]
996    pub async fn start_padding(self: &Arc<Self>, padder: client::CircuitPadder) -> Result<()> {
997        self.set_padder_impl(Some(padder)).await
998    }
999
1000    /// Remove any [`CircuitPadder`](client::CircuitPadder) installed for this channel.
1001    ///
1002    /// Does nothing if there was not a padder installed there.
1003    #[cfg(feature = "circ-padding-manual")]
1004    pub async fn stop_padding(self: &Arc<Self>) -> Result<()> {
1005        self.set_padder_impl(None).await
1006    }
1007
1008    /// Replace the [`CircuitPadder`](client::CircuitPadder) installed for this channel with `padder`.
1009    #[cfg(feature = "circ-padding-manual")]
1010    async fn set_padder_impl(
1011        self: &Arc<Self>,
1012        padder: Option<client::CircuitPadder>,
1013    ) -> Result<()> {
1014        let (tx, rx) = oneshot::channel();
1015        let msg = CtrlMsg::SetChannelPadder { padder, sender: tx };
1016        self.control
1017            .unbounded_send(msg)
1018            .map_err(|_| Error::ChannelClosed(ChannelClosed))?;
1019        rx.await.map_err(|_| Error::ChannelClosed(ChannelClosed))?
1020    }
1021
1022    /// Make a new fake reactor-less channel.  For testing only, obviously.
1023    ///
1024    /// Returns the receiver end of the control message mpsc.
1025    ///
1026    /// Suitable for external callers who want to test behaviour
1027    /// of layers including the logic in the channel frontend
1028    /// (`Channel` object methods).
1029    //
1030    // This differs from test::fake_channel as follows:
1031    //  * It returns the mpsc Receiver
1032    //  * It does not require explicit specification of details
1033    #[cfg(feature = "testing")]
1034    pub fn new_fake(
1035        rt: impl SleepProvider + CoarseTimeProvider,
1036        channel_type: ChannelType,
1037    ) -> (Channel, mpsc::UnboundedReceiver<CtrlMsg>) {
1038        let (control, control_recv) = mpsc::unbounded();
1039        let details = fake_channel_details();
1040
1041        let unique_id = UniqId::new();
1042        let peer_id = OwnedChanTarget::builder()
1043            .ed_identity([6_u8; 32].into())
1044            .rsa_identity([10_u8; 20].into())
1045            .build()
1046            .expect("Couldn't construct peer id");
1047
1048        // This will make rx trigger immediately.
1049        let (_tx, rx) = oneshot_broadcast::channel();
1050        let (padding_ctrl, _) = client::circuit::padding::new_padding(DynTimeProvider::new(rt));
1051
1052        let channel = Channel {
1053            channel_type,
1054            control,
1055            cell_tx: fake_mpsc().0,
1056            reactor_closed_rx: rx,
1057            padding_ctrl,
1058            unique_id,
1059            peer_id,
1060            clock_skew: ClockSkew::None,
1061            opened_at: coarsetime::Instant::now(),
1062            mutable: Default::default(),
1063            details,
1064            canonicity: Canonicity::new_canonical(),
1065        };
1066        (channel, control_recv)
1067    }
1068}
1069
1070/// If there is any identity in `wanted_ident` that is not present in
1071/// `my_ident`, return a ChanMismatch error.
1072///
1073/// This is a helper for [`Channel::check_match`] and
1074/// UnverifiedChannel::check_internal.
1075fn check_id_match_helper<T, U>(my_ident: &T, wanted_ident: &U) -> Result<()>
1076where
1077    T: HasRelayIds + ?Sized,
1078    U: HasRelayIds + ?Sized,
1079{
1080    for desired in wanted_ident.identities() {
1081        let id_type = desired.id_type();
1082        match my_ident.identity(id_type) {
1083            Some(actual) if actual == desired => {}
1084            Some(actual) => {
1085                return Err(Error::ChanMismatch(format!(
1086                    "Identity {} does not match target {}",
1087                    sv(actual),
1088                    sv(desired)
1089                )));
1090            }
1091            None => {
1092                return Err(Error::ChanMismatch(format!(
1093                    "Peer does not have {} identity",
1094                    id_type
1095                )));
1096            }
1097        }
1098    }
1099    Ok(())
1100}
1101
1102impl HasRelayIds for Channel {
1103    fn identity(
1104        &self,
1105        key_type: tor_linkspec::RelayIdType,
1106    ) -> Option<tor_linkspec::RelayIdRef<'_>> {
1107        self.peer_id.identity(key_type)
1108    }
1109}
1110
1111/// The status of a channel which was closed successfully.
1112///
1113/// **Note:** This doesn't have any associated data,
1114/// but may be expanded in the future.
1115// I can't think of any info we'd want to return to waiters,
1116// but this type leaves the possibility open without requiring any backwards-incompatible changes.
1117#[derive(Clone, Debug)]
1118#[non_exhaustive]
1119pub struct CloseInfo;
1120
1121/// The status of a channel which closed unexpectedly.
1122#[derive(Clone, Debug, thiserror::Error)]
1123#[non_exhaustive]
1124pub enum ClosedUnexpectedly {
1125    /// The channel reactor was dropped or panicked before completing.
1126    #[error("channel reactor was dropped or panicked before completing")]
1127    ReactorDropped,
1128    /// The channel reactor had an internal error.
1129    #[error("channel reactor had an internal error")]
1130    ReactorError(Error),
1131}
1132
1133/// Make some fake channel details (for testing only!)
1134#[cfg(any(test, feature = "testing"))]
1135fn fake_channel_details() -> Arc<ChannelDetails> {
1136    let unused_since = AtomicOptTimestamp::new();
1137
1138    Arc::new(ChannelDetails {
1139        unused_since,
1140        memquota: crate::util::fake_mq(),
1141    })
1142}
1143
1144/// Make an MPSC queue, of the type we use in Channels, but a fake one for testing
1145#[cfg(any(test, feature = "testing"))] // Used by Channel::new_fake which is also feature=testing
1146pub(crate) fn fake_mpsc() -> (CellTx, CellRx) {
1147    let (tx, rx) = crate::fake_mpsc(CHANNEL_BUFFER_SIZE);
1148    #[cfg(feature = "circ-padding")]
1149    let (tx, rx) = counting_streams::channel(tx, rx);
1150    (tx, rx)
1151}
1152
1153#[cfg(test)]
1154pub(crate) mod test {
1155    // Most of this module is tested via tests that also check on the
1156    // reactor code; there are just a few more cases to examine here.
1157    #![allow(clippy::unwrap_used)]
1158    use super::*;
1159    use crate::channel::handler::test::MsgBuf;
1160    pub(crate) use crate::channel::reactor::test::{CodecResult, new_reactor};
1161    use crate::util::fake_mq;
1162    use tor_cell::chancell::msg::HandshakeType;
1163    use tor_cell::chancell::{AnyChanCell, msg};
1164    use tor_rtcompat::{PreferredRuntime, test_with_one_runtime};
1165
1166    /// Make a new fake reactor-less channel.  For testing only, obviously.
1167    pub(crate) fn fake_channel(
1168        rt: impl SleepProvider + CoarseTimeProvider,
1169        channel_type: ChannelType,
1170    ) -> Channel {
1171        let unique_id = UniqId::new();
1172        let peer_id = OwnedChanTarget::builder()
1173            .ed_identity([6_u8; 32].into())
1174            .rsa_identity([10_u8; 20].into())
1175            .build()
1176            .expect("Couldn't construct peer id");
1177        // This will make rx trigger immediately.
1178        let (_tx, rx) = oneshot_broadcast::channel();
1179        let (padding_ctrl, _) = client::circuit::padding::new_padding(DynTimeProvider::new(rt));
1180        Channel {
1181            channel_type,
1182            control: mpsc::unbounded().0,
1183            cell_tx: fake_mpsc().0,
1184            reactor_closed_rx: rx,
1185            padding_ctrl,
1186            unique_id,
1187            peer_id,
1188            clock_skew: ClockSkew::None,
1189            opened_at: coarsetime::Instant::now(),
1190            mutable: Default::default(),
1191            details: fake_channel_details(),
1192            canonicity: Canonicity::new_canonical(),
1193        }
1194    }
1195
1196    #[test]
1197    fn send_bad() {
1198        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1199            use std::error::Error;
1200            let chan = fake_channel(rt, ChannelType::ClientInitiator);
1201
1202            let cell = AnyChanCell::new(CircId::new(7), msg::Created2::new(&b"hihi"[..]).into());
1203            let e = chan.sender().check_cell(&cell);
1204            assert!(e.is_err());
1205            assert!(
1206                format!("{}", e.unwrap_err().source().unwrap())
1207                    .contains("Can't send CREATED2 cell on client channel")
1208            );
1209            let cell = AnyChanCell::new(None, msg::Certs::new_empty().into());
1210            let e = chan.sender().check_cell(&cell);
1211            assert!(e.is_err());
1212            assert!(
1213                format!("{}", e.unwrap_err().source().unwrap())
1214                    .contains("Can't send CERTS cell after handshake is done")
1215            );
1216
1217            let cell = AnyChanCell::new(
1218                CircId::new(5),
1219                msg::Create2::new(HandshakeType::NTOR, &b"abc"[..]).into(),
1220            );
1221            let e = chan.sender().check_cell(&cell);
1222            assert!(e.is_ok());
1223            // FIXME(eta): more difficult to test that sending works now that it has to go via reactor
1224            // let got = output.next().await.unwrap();
1225            // assert!(matches!(got.msg(), ChanMsg::Create2(_)));
1226        });
1227    }
1228
1229    #[test]
1230    fn chanbuilder() {
1231        let rt = PreferredRuntime::create().unwrap();
1232        let mut builder = ChannelBuilder::default();
1233        builder.set_declared_method(tor_linkspec::ChannelMethod::Direct(vec![
1234            "127.0.0.1:9001".parse().unwrap(),
1235        ]));
1236        let tls = MsgBuf::new(&b""[..]);
1237        let _outbound = builder.launch_client(tls, rt, fake_mq());
1238    }
1239
1240    #[test]
1241    fn check_match() {
1242        test_with_one_runtime!(|rt| async move {
1243            let chan = fake_channel(rt, ChannelType::ClientInitiator);
1244
1245            let t1 = OwnedChanTarget::builder()
1246                .ed_identity([6; 32].into())
1247                .rsa_identity([10; 20].into())
1248                .build()
1249                .unwrap();
1250            let t2 = OwnedChanTarget::builder()
1251                .ed_identity([1; 32].into())
1252                .rsa_identity([3; 20].into())
1253                .build()
1254                .unwrap();
1255            let t3 = OwnedChanTarget::builder()
1256                .ed_identity([3; 32].into())
1257                .rsa_identity([2; 20].into())
1258                .build()
1259                .unwrap();
1260
1261            assert!(chan.check_match(&t1).is_ok());
1262            assert!(chan.check_match(&t2).is_err());
1263            assert!(chan.check_match(&t3).is_err());
1264        });
1265    }
1266
1267    #[test]
1268    fn unique_id() {
1269        test_with_one_runtime!(|rt| async move {
1270            let ch1 = fake_channel(rt.clone(), ChannelType::ClientInitiator);
1271            let ch2 = fake_channel(rt, ChannelType::ClientInitiator);
1272            assert_ne!(ch1.unique_id(), ch2.unique_id());
1273        });
1274    }
1275
1276    #[test]
1277    fn duration_unused_at() {
1278        test_with_one_runtime!(|rt| async move {
1279            let details = fake_channel_details();
1280            let mut ch = fake_channel(rt, ChannelType::ClientInitiator);
1281            ch.details = details.clone();
1282            details.unused_since.update();
1283            assert!(ch.duration_unused().is_some());
1284        });
1285    }
1286}