tor_proto/
channel.rs

1//! Code for talking directly (over a TLS connection) to a Tor client or relay.
2//!
3//! Channels form the basis of the rest of the Tor protocol: they are
4//! the only way for two Tor instances to talk.
5//!
6//! Channels are not useful directly for application requests: after
7//! making a channel, it needs to get used to build circuits, and the
8//! circuits are used to anonymize streams.  The streams are the
9//! objects corresponding to directory requests.
10//!
11//! In general, you shouldn't try to manage channels on your own;
12//! use the `tor-chanmgr` crate instead.
13//!
14//! To launch a channel:
15//!
16//!  * Create a TLS connection as an object that implements AsyncRead +
17//!    AsyncWrite + StreamOps, and pass it to a [ChannelBuilder].  This will
18//!    yield an [handshake::ClientInitiatorHandshake] that represents
19//!    the state of the handshake.
20//!  * Call [handshake::ClientInitiatorHandshake::connect] on the result
21//!    to negotiate the rest of the handshake.  This will verify
22//!    syntactic correctness of the handshake, but not its cryptographic
23//!    integrity.
24//!  * Call handshake::UnverifiedChannel::check on the result.  This
25//!    finishes the cryptographic checks.
26//!  * Call handshake::VerifiedChannel::finish on the result. This
27//!    completes the handshake and produces an open channel and Reactor.
28//!  * Launch an asynchronous task to call the reactor's run() method.
29//!
30//! One you have a running channel, you can create circuits on it with
31//! its [Channel::new_tunnel] method.  See
32//! [crate::client::circuit::PendingClientTunnel] for information on how to
33//! proceed from there.
34//!
35//! # Design
36//!
37//! For now, this code splits the channel into two pieces: a "Channel"
38//! object that can be used by circuits to write cells onto the
39//! channel, and a "Reactor" object that runs as a task in the
40//! background, to read channel cells and pass them to circuits as
41//! appropriate.
42//!
43//! I'm not at all sure that's the best way to do that, but it's what
44//! I could think of.
45//!
46//! # Limitations
47//!
48//! TODO: There is no rate limiting or fairness.
49
50/// The size of the channel buffer for communication between `Channel` and its reactor.
51pub const CHANNEL_BUFFER_SIZE: usize = 128;
52
53mod circmap;
54mod handler;
55pub(crate) mod handshake;
56pub mod kist;
57mod msg;
58pub mod padding;
59pub mod params;
60mod reactor;
61mod unique_id;
62
63pub use crate::channel::params::*;
64use crate::channel::reactor::{BoxedChannelSink, BoxedChannelStream, Reactor};
65pub use crate::channel::unique_id::UniqId;
66use crate::client::circuit::padding::{PaddingController, QueuedCellPaddingInfo};
67use crate::client::circuit::{PendingClientTunnel, TimeoutEstimator};
68use crate::memquota::{ChannelAccount, CircuitAccount, SpecificAccount as _};
69use crate::util::err::ChannelClosed;
70use crate::util::oneshot_broadcast;
71use crate::util::ts::AtomicOptTimestamp;
72use crate::{ClockSkew, client};
73use crate::{Error, Result};
74use cfg_if::cfg_if;
75use reactor::BoxedChannelStreamOps;
76use safelog::sensitive as sv;
77use std::future::{Future, IntoFuture};
78use std::pin::Pin;
79use std::sync::{Mutex, MutexGuard};
80use std::time::Duration;
81use tor_cell::chancell::ChanMsg;
82use tor_cell::chancell::msg::AnyChanMsg;
83use tor_cell::chancell::{AnyChanCell, CircId, msg::PaddingNegotiate};
84use tor_cell::restricted_msg;
85use tor_error::internal;
86use tor_linkspec::{HasRelayIds, OwnedChanTarget};
87use tor_memquota::mq_queue::{self, ChannelSpec as _, MpscSpec};
88use tor_rtcompat::{CoarseTimeProvider, DynTimeProvider, SleepProvider, StreamOps};
89
90#[cfg(feature = "circ-padding")]
91use tor_async_utils::counting_streams::{self, CountingSink, CountingStream};
92
93/// Imports that are re-exported pub if feature `testing` is enabled
94///
95/// Putting them together in a little module like this allows us to select the
96/// visibility for all of these things together.
97mod testing_exports {
98    #![allow(unreachable_pub)]
99    pub use super::reactor::CtrlMsg;
100    pub use crate::circuit::celltypes::CreateResponse;
101}
102#[cfg(feature = "testing")]
103pub use testing_exports::*;
104#[cfg(not(feature = "testing"))]
105use testing_exports::*;
106
107use asynchronous_codec;
108use futures::channel::mpsc;
109use futures::io::{AsyncRead, AsyncWrite};
110use oneshot_fused_workaround as oneshot;
111
112use educe::Educe;
113use futures::{FutureExt as _, Sink};
114use std::result::Result as StdResult;
115use std::sync::Arc;
116use std::task::{Context, Poll};
117
118use tracing::trace;
119
120// reexport
121#[cfg(feature = "relay")]
122pub use super::relay::channel::handshake::RelayInitiatorHandshake;
123use crate::channel::unique_id::CircUniqIdContext;
124pub use handshake::ClientInitiatorHandshake;
125
126use kist::KistParams;
127
128restricted_msg! {
129    /// A channel message that we allow to be sent from a server to a client on
130    /// an open channel.
131    ///
132    /// (An Open channel here is one on which we have received a NETINFO cell.)
133    ///
134    /// Note that an unexpected message type will _not_ be ignored: instead, it
135    /// will cause the channel to shut down.
136    #[derive(Clone, Debug)]
137    pub(crate) enum OpenChanMsgS2C : ChanMsg {
138        Padding,
139        Vpadding,
140        // Not Create*, since we are not a relay.
141        // Not Created, since we never send CREATE.
142        CreatedFast,
143        Created2,
144        Relay,
145        // Not RelayEarly, since we are a client.
146        Destroy,
147        // Not PaddingNegotiate, since we are not a relay.
148        // Not Versions, Certs, AuthChallenge, Authenticate: they are for handshakes.
149        // Not Authorize: it is reserved, but unused.
150    }
151}
152
153/// This indicate what type of channel it is. It allows us to decide for the correct channel cell
154/// state machines and authentication process (if any).
155///
156/// It is created when a channel is requested for creation which means the subsystem wanting to
157/// open a channel needs to know what type it wants.
158#[derive(Clone, Copy, Debug, derive_more::Display)]
159#[non_exhaustive]
160pub enum ChannelType {
161    /// Client: Initiated from a client to a relay. Client is unauthenticated and relay is
162    /// authenticated.
163    ClientInitiator,
164    /// Relay: Initiating as a relay to a relay. Both sides are authenticated.
165    RelayInitiator,
166    /// Relay: Responding as a relay to a relay or client. Authenticated or Unauthenticated.
167    RelayResponder {
168        /// Indicate if the channel is authenticated. Responding as a relay can be either from a
169        /// Relay (authenticated) or a Client/Bridge (Unauthenticated). We only know this
170        /// information once the handshake is completed.
171        ///
172        /// This side is always authenticated, the other side can be if a relay or not if
173        /// bridge/client. This is set to false unless we end up authenticating the other side
174        /// meaning a relay.
175        authenticated: bool,
176    },
177}
178
179impl ChannelType {
180    /// Return true if this channel type is an initiator.
181    pub(crate) fn is_initiator(&self) -> bool {
182        matches!(self, Self::ClientInitiator | Self::RelayInitiator)
183    }
184}
185
186/// A channel cell frame used for sending and receiving cells on a channel. The handler takes care
187/// of the cell codec transition depending in which state the channel is.
188///
189/// ChannelFrame is used to basically handle all in and outbound cells on a channel for its entire
190/// lifetime.
191pub(crate) type ChannelFrame<T> = asynchronous_codec::Framed<T, handler::ChannelCellHandler>;
192
193/// An entry in a channel's queue of cells to be flushed.
194pub(crate) type ChanCellQueueEntry = (AnyChanCell, Option<QueuedCellPaddingInfo>);
195
196/// Helper: Return a new channel frame [ChannelFrame] from an object implementing AsyncRead + AsyncWrite. In the
197/// tor context, it is always a TLS stream.
198///
199/// The ty (type) argument needs to be able to transform into a [handler::ChannelCellHandler] which would
200/// generally be a [ChannelType].
201pub(crate) fn new_frame<T, I>(tls: T, ty: I) -> ChannelFrame<T>
202where
203    T: AsyncRead + AsyncWrite,
204    I: Into<handler::ChannelCellHandler>,
205{
206    asynchronous_codec::Framed::new(tls, ty.into())
207}
208
209/// An open client channel, ready to send and receive Tor cells.
210///
211/// A channel is a direct connection to a Tor relay, implemented using TLS.
212///
213/// This struct is a frontend that can be used to send cells
214/// and otherwise control the channel.  The main state is
215/// in the Reactor object.
216///
217/// (Users need a mutable reference because of the types in `Sink`, and
218/// ultimately because `cell_tx: mpsc::Sender` doesn't work without mut.
219///
220/// # Channel life cycle
221///
222/// Channels can be created directly here through the [`ChannelBuilder`] API.
223/// For a higher-level API (with better support for TLS, pluggable transports,
224/// and channel reuse) see the `tor-chanmgr` crate.
225///
226/// After a channel is created, it will persist until it is closed in one of
227/// four ways:
228///    1. A remote error occurs.
229///    2. The other side of the channel closes the channel.
230///    3. Someone calls [`Channel::terminate`] on the channel.
231///    4. The last reference to the `Channel` is dropped. (Note that every circuit
232///       on a `Channel` keeps a reference to it, which will in turn keep the
233///       channel from closing until all those circuits have gone away.)
234///
235/// Note that in cases 1-3, the [`Channel`] object itself will still exist: it
236/// will just be unusable for most purposes.  Most operations on it will fail
237/// with an error.
238#[derive(Debug)]
239pub struct Channel {
240    /// The channel type.
241    #[expect(unused)] // TODO: Remove once used.
242    channel_type: ChannelType,
243    /// A channel used to send control messages to the Reactor.
244    control: mpsc::UnboundedSender<CtrlMsg>,
245    /// A channel used to send cells to the Reactor.
246    cell_tx: CellTx,
247
248    /// A receiver that indicates whether the channel is closed.
249    ///
250    /// Awaiting will return a `CancelledError` event when the reactor is dropped.
251    /// Read to decide if operations may succeed, and is returned by `wait_for_close`.
252    reactor_closed_rx: oneshot_broadcast::Receiver<Result<CloseInfo>>,
253
254    /// Padding controller, used to report when data is queued for this channel.
255    padding_ctrl: PaddingController,
256
257    /// A unique identifier for this channel.
258    unique_id: UniqId,
259    /// Validated identity and address information for this peer.
260    peer_id: OwnedChanTarget,
261    /// The declared clock skew on this channel, at the time when this channel was
262    /// created.
263    clock_skew: ClockSkew,
264    /// The time when this channel was successfully completed
265    opened_at: coarsetime::Instant,
266    /// Mutable state used by the `Channel.
267    mutable: Mutex<MutableDetails>,
268
269    /// Information shared with the reactor
270    details: Arc<ChannelDetails>,
271}
272
273/// This is information shared between the reactor and the frontend (`Channel` object).
274///
275/// `control` can't be here because we rely on it getting dropped when the last user goes away.
276#[derive(Debug)]
277pub(crate) struct ChannelDetails {
278    /// Since when the channel became unused.
279    ///
280    /// If calling `time_since_update` returns None,
281    /// this channel is still in use by at least one circuit.
282    ///
283    /// Set by reactor when a circuit is added or removed.
284    /// Read from `Channel::duration_unused`.
285    unused_since: AtomicOptTimestamp,
286    /// Memory quota account
287    ///
288    /// This is here partly because we need to ensure it lives as long as the channel,
289    /// as otherwise the memquota system will tear the account down.
290    #[allow(dead_code)]
291    memquota: ChannelAccount,
292}
293
294/// Mutable details (state) used by the `Channel` (frontend)
295#[derive(Debug, Default)]
296struct MutableDetails {
297    /// State used to control padding
298    padding: PaddingControlState,
299}
300
301/// State used to control padding
302///
303/// We store this here because:
304///
305///  1. It must be per-channel, because it depends on channel usage.  So it can't be in
306///     (for example) `ChannelPaddingInstructionsUpdate`.
307///
308///  2. It could be in the channel manager's per-channel state but (for code flow reasons
309///     there, really) at the point at which the channel manager concludes for a pending
310///     channel that it ought to update the usage, it has relinquished the lock on its own data
311///     structure.
312///     And there is actually no need for this to be global: a per-channel lock is better than
313///     reacquiring the global one.
314///
315///  3. It doesn't want to be in the channel reactor since that's super hot.
316///
317/// See also the overview at [`tor_proto::channel::padding`](padding)
318#[derive(Debug, Educe)]
319#[educe(Default)]
320enum PaddingControlState {
321    /// No usage of this channel, so far, implies sending or negotiating channel padding.
322    ///
323    /// This means we do not send (have not sent) any `ChannelPaddingInstructionsUpdates` to the reactor,
324    /// with the following consequences:
325    ///
326    ///  * We don't enable our own padding.
327    ///  * We don't do any work to change the timeout distribution in the padding timer,
328    ///    (which is fine since this timer is not enabled).
329    ///  * We don't send any PADDING_NEGOTIATE cells.  The peer is supposed to come to the
330    ///    same conclusions as us, based on channel usage: it should also not send padding.
331    #[educe(Default)]
332    UsageDoesNotImplyPadding {
333        /// The last padding parameters (from reparameterize)
334        ///
335        /// We keep this so that we can send it if and when
336        /// this channel starts to be used in a way that implies (possibly) sending padding.
337        padding_params: ChannelPaddingInstructionsUpdates,
338    },
339
340    /// Some usage of this channel implies possibly sending channel padding
341    ///
342    /// The required padding timer, negotiation cell, etc.,
343    /// have been communicated to the reactor via a `CtrlMsg::ConfigUpdate`.
344    ///
345    /// Once we have set this variant, it remains this way forever for this channel,
346    /// (the spec speaks of channels "only used for" certain purposes not getting padding).
347    PaddingConfigured,
348}
349
350use PaddingControlState as PCS;
351
352cfg_if! {
353    if #[cfg(feature="circ-padding")] {
354        /// Implementation type for a ChannelSender.
355        type CellTx = CountingSink<mq_queue::Sender<ChanCellQueueEntry, mq_queue::MpscSpec>>;
356
357        /// Implementation type for a cell queue held by a reactor.
358        type CellRx = CountingStream<mq_queue::Receiver<ChanCellQueueEntry, mq_queue::MpscSpec>>;
359    } else {
360        /// Implementation type for a ChannelSender.
361        type CellTx = mq_queue::Sender<ChanCellQueueEntry, mq_queue::MpscSpec>;
362
363        /// Implementation type for a cell queue held by a reactor.
364        type CellRx = mq_queue::Receiver<ChanCellQueueEntry, mq_queue::MpscSpec>;
365    }
366}
367
368/// A handle to a [`Channel`]` that can be used, by circuits, to send channel cells.
369#[derive(Debug)]
370pub(crate) struct ChannelSender {
371    /// MPSC sender to send cells.
372    cell_tx: CellTx,
373    /// A receiver used to check if the channel is closed.
374    reactor_closed_rx: oneshot_broadcast::Receiver<Result<CloseInfo>>,
375    /// Unique ID for this channel. For logging.
376    unique_id: UniqId,
377    /// Padding controller for this channel:
378    /// used to report when we queue data that will eventually wind up on the channel.
379    padding_ctrl: PaddingController,
380}
381
382impl ChannelSender {
383    /// Check whether a cell type is permissible to be _sent_ on an
384    /// open client channel.
385    fn check_cell(&self, cell: &AnyChanCell) -> Result<()> {
386        use tor_cell::chancell::msg::AnyChanMsg::*;
387        let msg = cell.msg();
388        match msg {
389            Created(_) | Created2(_) | CreatedFast(_) => Err(Error::from(internal!(
390                "Can't send {} cell on client channel",
391                msg.cmd()
392            ))),
393            Certs(_) | Versions(_) | Authenticate(_) | AuthChallenge(_) | Netinfo(_) => {
394                Err(Error::from(internal!(
395                    "Can't send {} cell after handshake is done",
396                    msg.cmd()
397                )))
398            }
399            _ => Ok(()),
400        }
401    }
402
403    /// Obtain a reference to the `ChannelSender`'s [`DynTimeProvider`]
404    ///
405    /// (This can sometimes be used to avoid having to keep
406    /// a separate clone of the time provider.)
407    pub(crate) fn time_provider(&self) -> &DynTimeProvider {
408        cfg_if! {
409            if #[cfg(feature="circ-padding")] {
410                self.cell_tx.inner().time_provider()
411            } else {
412                self.cell_tx.time_provider()
413            }
414        }
415    }
416
417    /// Return an approximate count of the number of outbound cells queued for this channel.
418    ///
419    /// This count is necessarily approximate,
420    /// because the underlying count can be modified by other senders and receivers
421    /// between when this method is called and when its return value is used.
422    ///
423    /// Does not include cells that have already been passed to the TLS connection.
424    ///
425    /// Circuit padding uses this count to determine
426    /// when messages are already outbound for the first hop of a circuit.
427    #[cfg(feature = "circ-padding")]
428    pub(crate) fn approx_count(&self) -> usize {
429        self.cell_tx.approx_count()
430    }
431
432    /// Note that a cell has been queued that will eventually be placed onto this sender.
433    ///
434    /// We use this as an input for padding machines.
435    pub(crate) fn note_cell_queued(&self) {
436        self.padding_ctrl.queued_data(crate::HopNum::from(0));
437    }
438}
439
440impl Sink<ChanCellQueueEntry> for ChannelSender {
441    type Error = Error;
442
443    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
444        let this = self.get_mut();
445        Pin::new(&mut this.cell_tx)
446            .poll_ready(cx)
447            .map_err(|_| ChannelClosed.into())
448    }
449
450    fn start_send(self: Pin<&mut Self>, cell: ChanCellQueueEntry) -> Result<()> {
451        let this = self.get_mut();
452        if this.reactor_closed_rx.is_ready() {
453            return Err(ChannelClosed.into());
454        }
455        this.check_cell(&cell.0)?;
456        {
457            use tor_cell::chancell::msg::AnyChanMsg::*;
458            match cell.0.msg() {
459                Relay(_) | Padding(_) | Vpadding(_) => {} // too frequent to log.
460                _ => trace!(
461                    channel_id = %this.unique_id,
462                    "Sending {} for {}",
463                    cell.0.msg().cmd(),
464                    CircId::get_or_zero(cell.0.circid())
465                ),
466            }
467        }
468
469        Pin::new(&mut this.cell_tx)
470            .start_send(cell)
471            .map_err(|_| ChannelClosed.into())
472    }
473
474    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
475        let this = self.get_mut();
476        Pin::new(&mut this.cell_tx)
477            .poll_flush(cx)
478            .map_err(|_| ChannelClosed.into())
479    }
480
481    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
482        let this = self.get_mut();
483        Pin::new(&mut this.cell_tx)
484            .poll_close(cx)
485            .map_err(|_| ChannelClosed.into())
486    }
487}
488
489/// Structure for building and launching a Tor channel.
490#[derive(Default)]
491pub struct ChannelBuilder {
492    /// If present, a description of the address we're trying to connect to,
493    /// and the way in which we are trying to connect to it.
494    ///
495    /// TODO: at some point, check this against the addresses in the netinfo
496    /// cell too.
497    target: Option<tor_linkspec::ChannelMethod>,
498}
499
500impl ChannelBuilder {
501    /// Construct a new ChannelBuilder.
502    pub fn new() -> Self {
503        ChannelBuilder::default()
504    }
505
506    /// Set the declared target method of this channel to correspond to a direct
507    /// connection to a given socket address.
508    #[deprecated(note = "use set_declared_method instead", since = "0.7.1")]
509    pub fn set_declared_addr(&mut self, target: std::net::SocketAddr) {
510        self.set_declared_method(tor_linkspec::ChannelMethod::Direct(vec![target]));
511    }
512
513    /// Set the declared target method of this channel.
514    ///
515    /// Note that nothing enforces the correctness of this method: it
516    /// doesn't have to match the real method used to create the TLS
517    /// stream.
518    pub fn set_declared_method(&mut self, target: tor_linkspec::ChannelMethod) {
519        self.target = Some(target);
520    }
521
522    /// Launch a new client handshake over a TLS stream.
523    ///
524    /// After calling this function, you'll need to call `connect()` on
525    /// the result to start the handshake.  If that succeeds, you'll have
526    /// authentication info from the relay: call `check()` on the result
527    /// to check that.  Finally, to finish the handshake, call `finish()`
528    /// on the result of _that_.
529    pub fn launch_client<T, S>(
530        self,
531        tls: T,
532        sleep_prov: S,
533        memquota: ChannelAccount,
534    ) -> ClientInitiatorHandshake<T, S>
535    where
536        T: AsyncRead + AsyncWrite + StreamOps + Send + Unpin + 'static,
537        S: CoarseTimeProvider + SleepProvider,
538    {
539        handshake::ClientInitiatorHandshake::new(tls, self.target, sleep_prov, memquota)
540    }
541}
542
543impl Channel {
544    /// Construct a channel and reactor.
545    ///
546    /// Internal method, called to finalize the channel when we've
547    /// sent our netinfo cell, received the peer's netinfo cell, and
548    /// we're finally ready to create circuits.
549    ///
550    /// Quick note on the allow clippy. This is has one call site so for now, it is fine that we
551    /// bust the mighty 7 arguments.
552    #[allow(clippy::too_many_arguments)] // TODO consider if we want a builder
553    fn new<S>(
554        channel_type: ChannelType,
555        link_protocol: u16,
556        sink: BoxedChannelSink,
557        stream: BoxedChannelStream,
558        streamops: BoxedChannelStreamOps,
559        unique_id: UniqId,
560        peer_id: OwnedChanTarget,
561        clock_skew: ClockSkew,
562        sleep_prov: S,
563        memquota: ChannelAccount,
564    ) -> Result<(Arc<Self>, reactor::Reactor<S>)>
565    where
566        S: CoarseTimeProvider + SleepProvider,
567    {
568        use circmap::{CircIdRange, CircMap};
569        let circmap = CircMap::new(CircIdRange::High);
570        let dyn_time = DynTimeProvider::new(sleep_prov.clone());
571
572        let (control_tx, control_rx) = mpsc::unbounded();
573        let (cell_tx, cell_rx) = mq_queue::MpscSpec::new(CHANNEL_BUFFER_SIZE)
574            .new_mq(dyn_time.clone(), memquota.as_raw_account())?;
575        #[cfg(feature = "circ-padding")]
576        let (cell_tx, cell_rx) = counting_streams::channel(cell_tx, cell_rx);
577        let unused_since = AtomicOptTimestamp::new();
578        unused_since.update();
579
580        let mutable = MutableDetails::default();
581        let (reactor_closed_tx, reactor_closed_rx) = oneshot_broadcast::channel();
582
583        let details = ChannelDetails {
584            unused_since,
585            memquota,
586        };
587        let details = Arc::new(details);
588
589        // We might be using experimental maybenot padding; this creates the padding framework for that.
590        //
591        // TODO: This backend is currently optimized for circuit padding,
592        // so it might allocate a bit more than necessary to account for multiple hops.
593        // We should tune it when we deploy padding in production.
594        let (padding_ctrl, padding_event_stream) =
595            client::circuit::padding::new_padding(DynTimeProvider::new(sleep_prov.clone()));
596
597        let channel = Arc::new(Channel {
598            channel_type,
599            control: control_tx,
600            cell_tx,
601            reactor_closed_rx,
602            padding_ctrl: padding_ctrl.clone(),
603            unique_id,
604            peer_id,
605            clock_skew,
606            opened_at: coarsetime::Instant::now(),
607            mutable: Mutex::new(mutable),
608            details: Arc::clone(&details),
609        });
610
611        // We start disabled; the channel manager will `reconfigure` us soon after creation.
612        let padding_timer = Box::pin(padding::Timer::new_disabled(sleep_prov.clone(), None)?);
613
614        cfg_if! {
615            if #[cfg(feature = "circ-padding")] {
616                use crate::util::sink_blocker::{SinkBlocker,CountingPolicy};
617                let sink = SinkBlocker::new(sink, CountingPolicy::new_unlimited());
618            }
619        }
620
621        let reactor = Reactor {
622            runtime: sleep_prov,
623            control: control_rx,
624            cells: cell_rx,
625            reactor_closed_tx,
626            input: futures::StreamExt::fuse(stream),
627            output: sink,
628            streamops,
629            circs: circmap,
630            circ_unique_id_ctx: CircUniqIdContext::new(),
631            link_protocol,
632            unique_id,
633            details,
634            padding_timer,
635            padding_ctrl,
636            padding_event_stream,
637            padding_blocker: None,
638            special_outgoing: Default::default(),
639        };
640
641        Ok((channel, reactor))
642    }
643
644    /// Return a process-unique identifier for this channel.
645    pub fn unique_id(&self) -> UniqId {
646        self.unique_id
647    }
648
649    /// Return a reference to the memory tracking account for this Channel
650    pub fn mq_account(&self) -> &ChannelAccount {
651        &self.details.memquota
652    }
653
654    /// Obtain a reference to the `Channel`'s [`DynTimeProvider`]
655    ///
656    /// (This can sometimes be used to avoid having to keep
657    /// a separate clone of the time provider.)
658    pub fn time_provider(&self) -> &DynTimeProvider {
659        cfg_if! {
660            if #[cfg(feature="circ-padding")] {
661                self.cell_tx.inner().time_provider()
662            } else {
663                self.cell_tx.time_provider()
664            }
665        }
666    }
667
668    /// Return an OwnedChanTarget representing the actual handshake used to
669    /// create this channel.
670    pub fn target(&self) -> &OwnedChanTarget {
671        &self.peer_id
672    }
673
674    /// Return the amount of time that has passed since this channel became open.
675    pub fn age(&self) -> Duration {
676        self.opened_at.elapsed().into()
677    }
678
679    /// Return a ClockSkew declaring how much clock skew the other side of this channel
680    /// claimed that we had when we negotiated the connection.
681    pub fn clock_skew(&self) -> ClockSkew {
682        self.clock_skew
683    }
684
685    /// Send a control message
686    fn send_control(&self, msg: CtrlMsg) -> StdResult<(), ChannelClosed> {
687        self.control
688            .unbounded_send(msg)
689            .map_err(|_| ChannelClosed)?;
690        Ok(())
691    }
692
693    /// Acquire the lock on `mutable` (and handle any poison error)
694    fn mutable(&self) -> MutexGuard<MutableDetails> {
695        self.mutable.lock().expect("channel details poisoned")
696    }
697
698    /// Specify that this channel should do activities related to channel padding
699    ///
700    /// Initially, the channel does nothing related to channel padding:
701    /// it neither sends any padding, nor sends any PADDING_NEGOTIATE cells.
702    ///
703    /// After this function has been called, it will do both,
704    /// according to the parameters specified through `reparameterize`.
705    /// Note that this might include *disabling* padding
706    /// (for example, by sending a `PADDING_NEGOTIATE`).
707    ///
708    /// Idempotent.
709    ///
710    /// There is no way to undo the effect of this call.
711    pub fn engage_padding_activities(&self) {
712        let mut mutable = self.mutable();
713
714        match &mutable.padding {
715            PCS::UsageDoesNotImplyPadding {
716                padding_params: params,
717            } => {
718                // Well, apparently the channel usage *does* imply padding now,
719                // so we need to (belatedly) enable the timer,
720                // send the padding negotiation cell, etc.
721                let mut params = params.clone();
722
723                // Except, maybe the padding we would be requesting is precisely default,
724                // so we wouldn't actually want to send that cell.
725                if params.padding_negotiate == Some(PaddingNegotiate::start_default()) {
726                    params.padding_negotiate = None;
727                }
728
729                match self.send_control(CtrlMsg::ConfigUpdate(Arc::new(params))) {
730                    Ok(()) => {}
731                    Err(ChannelClosed) => return,
732                }
733
734                mutable.padding = PCS::PaddingConfigured;
735            }
736
737            PCS::PaddingConfigured => {
738                // OK, nothing to do
739            }
740        }
741
742        drop(mutable); // release the lock now: lock span covers the send, ensuring ordering
743    }
744
745    /// Reparameterise (update parameters; reconfigure)
746    ///
747    /// Returns `Err` if the channel was closed earlier
748    pub fn reparameterize(&self, params: Arc<ChannelPaddingInstructionsUpdates>) -> Result<()> {
749        let mut mutable = self
750            .mutable
751            .lock()
752            .map_err(|_| internal!("channel details poisoned"))?;
753
754        match &mut mutable.padding {
755            PCS::PaddingConfigured => {
756                self.send_control(CtrlMsg::ConfigUpdate(params))?;
757            }
758            PCS::UsageDoesNotImplyPadding { padding_params } => {
759                padding_params.combine(&params);
760            }
761        }
762
763        drop(mutable); // release the lock now: lock span covers the send, ensuring ordering
764        Ok(())
765    }
766
767    /// Update the KIST parameters.
768    ///
769    /// Returns `Err` if the channel is closed.
770    pub fn reparameterize_kist(&self, kist_params: KistParams) -> Result<()> {
771        Ok(self.send_control(CtrlMsg::KistConfigUpdate(kist_params))?)
772    }
773
774    /// Return an error if this channel is somehow mismatched with the
775    /// given target.
776    pub fn check_match<T: HasRelayIds + ?Sized>(&self, target: &T) -> Result<()> {
777        check_id_match_helper(&self.peer_id, target)
778    }
779
780    /// Return true if this channel is closed and therefore unusable.
781    pub fn is_closing(&self) -> bool {
782        self.reactor_closed_rx.is_ready()
783    }
784
785    /// If the channel is not in use, return the amount of time
786    /// it has had with no circuits.
787    ///
788    /// Return `None` if the channel is currently in use.
789    pub fn duration_unused(&self) -> Option<std::time::Duration> {
790        self.details
791            .unused_since
792            .time_since_update()
793            .map(Into::into)
794    }
795
796    /// Return a new [`ChannelSender`] to transmit cells on this channel.
797    pub(crate) fn sender(&self) -> ChannelSender {
798        ChannelSender {
799            cell_tx: self.cell_tx.clone(),
800            reactor_closed_rx: self.reactor_closed_rx.clone(),
801            unique_id: self.unique_id,
802            padding_ctrl: self.padding_ctrl.clone(),
803        }
804    }
805
806    /// Return a newly allocated PendingClientTunnel object with
807    /// a corresponding tunnel reactor. A circuit ID is allocated, but no
808    /// messages are sent, and no cryptography is done.
809    ///
810    /// To use the results of this method, call Reactor::run() in a
811    /// new task, then use the methods of
812    /// [crate::client::circuit::PendingClientTunnel] to build the circuit.
813    pub async fn new_tunnel(
814        self: &Arc<Self>,
815        timeouts: Arc<dyn TimeoutEstimator>,
816    ) -> Result<(PendingClientTunnel, client::reactor::Reactor)> {
817        if self.is_closing() {
818            return Err(ChannelClosed.into());
819        }
820
821        let time_prov = self.time_provider().clone();
822        let memquota = CircuitAccount::new(&self.details.memquota)?;
823
824        // TODO: blocking is risky, but so is unbounded.
825        let (sender, receiver) =
826            MpscSpec::new(128).new_mq(time_prov.clone(), memquota.as_raw_account())?;
827        let (createdsender, createdreceiver) = oneshot::channel::<CreateResponse>();
828
829        let (tx, rx) = oneshot::channel();
830        self.send_control(CtrlMsg::AllocateCircuit {
831            created_sender: createdsender,
832            sender,
833            tx,
834        })?;
835        let (id, circ_unique_id, padding_ctrl, padding_stream) =
836            rx.await.map_err(|_| ChannelClosed)??;
837
838        trace!("{}: Allocated CircId {}", circ_unique_id, id);
839
840        Ok(PendingClientTunnel::new(
841            id,
842            self.clone(),
843            createdreceiver,
844            receiver,
845            circ_unique_id,
846            time_prov,
847            memquota,
848            padding_ctrl,
849            padding_stream,
850            timeouts,
851        ))
852    }
853
854    /// Shut down this channel immediately, along with all circuits that
855    /// are using it.
856    ///
857    /// Note that other references to this channel may exist.  If they
858    /// do, they will stop working after you call this function.
859    ///
860    /// It's not necessary to call this method if you're just done
861    /// with a channel: the channel should close on its own once nothing
862    /// is using it any more.
863    pub fn terminate(&self) {
864        let _ = self.send_control(CtrlMsg::Shutdown);
865    }
866
867    /// Tell the reactor that the circuit with the given ID has gone away.
868    pub fn close_circuit(&self, circid: CircId) -> Result<()> {
869        self.send_control(CtrlMsg::CloseCircuit(circid))?;
870        Ok(())
871    }
872
873    /// Return a future that will resolve once this channel has closed.
874    ///
875    /// Note that this method does not _cause_ the channel to shut down on its own.
876    pub fn wait_for_close(
877        &self,
878    ) -> impl Future<Output = StdResult<CloseInfo, ClosedUnexpectedly>> + Send + Sync + 'static + use<>
879    {
880        self.reactor_closed_rx
881            .clone()
882            .into_future()
883            .map(|recv| match recv {
884                Ok(Ok(info)) => Ok(info),
885                Ok(Err(e)) => Err(ClosedUnexpectedly::ReactorError(e)),
886                Err(oneshot_broadcast::SenderDropped) => Err(ClosedUnexpectedly::ReactorDropped),
887            })
888    }
889
890    /// Install a [`CircuitPadder`](client::CircuitPadder) for this channel.
891    ///
892    /// Replaces any previous padder installed.
893    #[cfg(feature = "circ-padding-manual")]
894    pub async fn start_padding(self: &Arc<Self>, padder: client::CircuitPadder) -> Result<()> {
895        self.set_padder_impl(Some(padder)).await
896    }
897
898    /// Remove any [`CircuitPadder`](client::CircuitPadder) installed for this channel.
899    ///
900    /// Does nothing if there was not a padder installed there.
901    #[cfg(feature = "circ-padding-manual")]
902    pub async fn stop_padding(self: &Arc<Self>) -> Result<()> {
903        self.set_padder_impl(None).await
904    }
905
906    /// Replace the [`CircuitPadder`](client::CircuitPadder) installed for this channel with `padder`.
907    #[cfg(feature = "circ-padding-manual")]
908    async fn set_padder_impl(
909        self: &Arc<Self>,
910        padder: Option<client::CircuitPadder>,
911    ) -> Result<()> {
912        let (tx, rx) = oneshot::channel();
913        let msg = CtrlMsg::SetChannelPadder { padder, sender: tx };
914        self.control
915            .unbounded_send(msg)
916            .map_err(|_| Error::ChannelClosed(ChannelClosed))?;
917        rx.await.map_err(|_| Error::ChannelClosed(ChannelClosed))?
918    }
919
920    /// Make a new fake reactor-less channel.  For testing only, obviously.
921    ///
922    /// Returns the receiver end of the control message mpsc.
923    ///
924    /// Suitable for external callers who want to test behaviour
925    /// of layers including the logic in the channel frontend
926    /// (`Channel` object methods).
927    //
928    // This differs from test::fake_channel as follows:
929    //  * It returns the mpsc Receiver
930    //  * It does not require explicit specification of details
931    #[cfg(feature = "testing")]
932    pub fn new_fake(
933        rt: impl SleepProvider + CoarseTimeProvider,
934        channel_type: ChannelType,
935    ) -> (Channel, mpsc::UnboundedReceiver<CtrlMsg>) {
936        let (control, control_recv) = mpsc::unbounded();
937        let details = fake_channel_details();
938
939        let unique_id = UniqId::new();
940        let peer_id = OwnedChanTarget::builder()
941            .ed_identity([6_u8; 32].into())
942            .rsa_identity([10_u8; 20].into())
943            .build()
944            .expect("Couldn't construct peer id");
945
946        // This will make rx trigger immediately.
947        let (_tx, rx) = oneshot_broadcast::channel();
948        let (padding_ctrl, _) = client::circuit::padding::new_padding(DynTimeProvider::new(rt));
949
950        let channel = Channel {
951            channel_type,
952            control,
953            cell_tx: fake_mpsc().0,
954            reactor_closed_rx: rx,
955            padding_ctrl,
956            unique_id,
957            peer_id,
958            clock_skew: ClockSkew::None,
959            opened_at: coarsetime::Instant::now(),
960            mutable: Default::default(),
961            details,
962        };
963        (channel, control_recv)
964    }
965}
966
967/// If there is any identity in `wanted_ident` that is not present in
968/// `my_ident`, return a ChanMismatch error.
969///
970/// This is a helper for [`Channel::check_match`] and
971/// UnverifiedChannel::check_internal.
972fn check_id_match_helper<T, U>(my_ident: &T, wanted_ident: &U) -> Result<()>
973where
974    T: HasRelayIds + ?Sized,
975    U: HasRelayIds + ?Sized,
976{
977    for desired in wanted_ident.identities() {
978        let id_type = desired.id_type();
979        match my_ident.identity(id_type) {
980            Some(actual) if actual == desired => {}
981            Some(actual) => {
982                return Err(Error::ChanMismatch(format!(
983                    "Identity {} does not match target {}",
984                    sv(actual),
985                    sv(desired)
986                )));
987            }
988            None => {
989                return Err(Error::ChanMismatch(format!(
990                    "Peer does not have {} identity",
991                    id_type
992                )));
993            }
994        }
995    }
996    Ok(())
997}
998
999impl HasRelayIds for Channel {
1000    fn identity(
1001        &self,
1002        key_type: tor_linkspec::RelayIdType,
1003    ) -> Option<tor_linkspec::RelayIdRef<'_>> {
1004        self.peer_id.identity(key_type)
1005    }
1006}
1007
1008/// The status of a channel which was closed successfully.
1009///
1010/// **Note:** This doesn't have any associated data,
1011/// but may be expanded in the future.
1012// I can't think of any info we'd want to return to waiters,
1013// but this type leaves the possibility open without requiring any backwards-incompatible changes.
1014#[derive(Clone, Debug)]
1015#[non_exhaustive]
1016pub struct CloseInfo;
1017
1018/// The status of a channel which closed unexpectedly.
1019#[derive(Clone, Debug, thiserror::Error)]
1020#[non_exhaustive]
1021pub enum ClosedUnexpectedly {
1022    /// The channel reactor was dropped or panicked before completing.
1023    #[error("channel reactor was dropped or panicked before completing")]
1024    ReactorDropped,
1025    /// The channel reactor had an internal error.
1026    #[error("channel reactor had an internal error")]
1027    ReactorError(Error),
1028}
1029
1030/// Make some fake channel details (for testing only!)
1031#[cfg(any(test, feature = "testing"))]
1032fn fake_channel_details() -> Arc<ChannelDetails> {
1033    let unused_since = AtomicOptTimestamp::new();
1034
1035    Arc::new(ChannelDetails {
1036        unused_since,
1037        memquota: crate::util::fake_mq(),
1038    })
1039}
1040
1041/// Make an MPSC queue, of the type we use in Channels, but a fake one for testing
1042#[cfg(any(test, feature = "testing"))] // Used by Channel::new_fake which is also feature=testing
1043pub(crate) fn fake_mpsc() -> (CellTx, CellRx) {
1044    let (tx, rx) = crate::fake_mpsc(CHANNEL_BUFFER_SIZE);
1045    #[cfg(feature = "circ-padding")]
1046    let (tx, rx) = counting_streams::channel(tx, rx);
1047    (tx, rx)
1048}
1049
1050#[cfg(test)]
1051pub(crate) mod test {
1052    // Most of this module is tested via tests that also check on the
1053    // reactor code; there are just a few more cases to examine here.
1054    #![allow(clippy::unwrap_used)]
1055    use super::*;
1056    use crate::channel::handler::test::MsgBuf;
1057    pub(crate) use crate::channel::reactor::test::{CodecResult, new_reactor};
1058    use crate::util::fake_mq;
1059    use tor_cell::chancell::msg::HandshakeType;
1060    use tor_cell::chancell::{AnyChanCell, msg};
1061    use tor_rtcompat::{PreferredRuntime, test_with_one_runtime};
1062
1063    /// Make a new fake reactor-less channel.  For testing only, obviously.
1064    pub(crate) fn fake_channel(
1065        rt: impl SleepProvider + CoarseTimeProvider,
1066        channel_type: ChannelType,
1067    ) -> Channel {
1068        let unique_id = UniqId::new();
1069        let peer_id = OwnedChanTarget::builder()
1070            .ed_identity([6_u8; 32].into())
1071            .rsa_identity([10_u8; 20].into())
1072            .build()
1073            .expect("Couldn't construct peer id");
1074        // This will make rx trigger immediately.
1075        let (_tx, rx) = oneshot_broadcast::channel();
1076        let (padding_ctrl, _) = client::circuit::padding::new_padding(DynTimeProvider::new(rt));
1077        Channel {
1078            channel_type,
1079            control: mpsc::unbounded().0,
1080            cell_tx: fake_mpsc().0,
1081            reactor_closed_rx: rx,
1082            padding_ctrl,
1083            unique_id,
1084            peer_id,
1085            clock_skew: ClockSkew::None,
1086            opened_at: coarsetime::Instant::now(),
1087            mutable: Default::default(),
1088            details: fake_channel_details(),
1089        }
1090    }
1091
1092    #[test]
1093    fn send_bad() {
1094        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1095            use std::error::Error;
1096            let chan = fake_channel(rt, ChannelType::ClientInitiator);
1097
1098            let cell = AnyChanCell::new(CircId::new(7), msg::Created2::new(&b"hihi"[..]).into());
1099            let e = chan.sender().check_cell(&cell);
1100            assert!(e.is_err());
1101            assert!(
1102                format!("{}", e.unwrap_err().source().unwrap())
1103                    .contains("Can't send CREATED2 cell on client channel")
1104            );
1105            let cell = AnyChanCell::new(None, msg::Certs::new_empty().into());
1106            let e = chan.sender().check_cell(&cell);
1107            assert!(e.is_err());
1108            assert!(
1109                format!("{}", e.unwrap_err().source().unwrap())
1110                    .contains("Can't send CERTS cell after handshake is done")
1111            );
1112
1113            let cell = AnyChanCell::new(
1114                CircId::new(5),
1115                msg::Create2::new(HandshakeType::NTOR, &b"abc"[..]).into(),
1116            );
1117            let e = chan.sender().check_cell(&cell);
1118            assert!(e.is_ok());
1119            // FIXME(eta): more difficult to test that sending works now that it has to go via reactor
1120            // let got = output.next().await.unwrap();
1121            // assert!(matches!(got.msg(), ChanMsg::Create2(_)));
1122        });
1123    }
1124
1125    #[test]
1126    fn chanbuilder() {
1127        let rt = PreferredRuntime::create().unwrap();
1128        let mut builder = ChannelBuilder::default();
1129        builder.set_declared_method(tor_linkspec::ChannelMethod::Direct(vec![
1130            "127.0.0.1:9001".parse().unwrap(),
1131        ]));
1132        let tls = MsgBuf::new(&b""[..]);
1133        let _outbound = builder.launch_client(tls, rt, fake_mq());
1134    }
1135
1136    #[test]
1137    fn check_match() {
1138        test_with_one_runtime!(|rt| async move {
1139            let chan = fake_channel(rt, ChannelType::ClientInitiator);
1140
1141            let t1 = OwnedChanTarget::builder()
1142                .ed_identity([6; 32].into())
1143                .rsa_identity([10; 20].into())
1144                .build()
1145                .unwrap();
1146            let t2 = OwnedChanTarget::builder()
1147                .ed_identity([1; 32].into())
1148                .rsa_identity([3; 20].into())
1149                .build()
1150                .unwrap();
1151            let t3 = OwnedChanTarget::builder()
1152                .ed_identity([3; 32].into())
1153                .rsa_identity([2; 20].into())
1154                .build()
1155                .unwrap();
1156
1157            assert!(chan.check_match(&t1).is_ok());
1158            assert!(chan.check_match(&t2).is_err());
1159            assert!(chan.check_match(&t3).is_err());
1160        });
1161    }
1162
1163    #[test]
1164    fn unique_id() {
1165        test_with_one_runtime!(|rt| async move {
1166            let ch1 = fake_channel(rt.clone(), ChannelType::ClientInitiator);
1167            let ch2 = fake_channel(rt, ChannelType::ClientInitiator);
1168            assert_ne!(ch1.unique_id(), ch2.unique_id());
1169        });
1170    }
1171
1172    #[test]
1173    fn duration_unused_at() {
1174        test_with_one_runtime!(|rt| async move {
1175            let details = fake_channel_details();
1176            let mut ch = fake_channel(rt, ChannelType::ClientInitiator);
1177            ch.details = details.clone();
1178            details.unused_since.update();
1179            assert!(ch.duration_unused().is_some());
1180        });
1181    }
1182}