tor_proto/
channel.rs

1//! Code for talking directly (over a TLS connection) to a Tor client or relay.
2//!
3//! Channels form the basis of the rest of the Tor protocol: they are
4//! the only way for two Tor instances to talk.
5//!
6//! Channels are not useful directly for application requests: after
7//! making a channel, it needs to get used to build circuits, and the
8//! circuits are used to anonymize streams.  The streams are the
9//! objects corresponding to directory requests.
10//!
11//! In general, you shouldn't try to manage channels on your own;
12//! use the `tor-chanmgr` crate instead.
13//!
14//! To launch a channel:
15//!
16//!  * Create a TLS connection as an object that implements AsyncRead +
17//!    AsyncWrite + StreamOps, and pass it to a [ChannelBuilder].  This will
18//!    yield an [handshake::OutboundClientHandshake] that represents
19//!    the state of the handshake.
20//!  * Call [handshake::OutboundClientHandshake::connect] on the result
21//!    to negotiate the rest of the handshake.  This will verify
22//!    syntactic correctness of the handshake, but not its cryptographic
23//!    integrity.
24//!  * Call [handshake::UnverifiedChannel::check] on the result.  This
25//!    finishes the cryptographic checks.
26//!  * Call [handshake::VerifiedChannel::finish] on the result. This
27//!    completes the handshake and produces an open channel and Reactor.
28//!  * Launch an asynchronous task to call the reactor's run() method.
29//!
30//! One you have a running channel, you can create circuits on it with
31//! its [Channel::new_circ] method.  See
32//! [crate::tunnel::circuit::PendingClientCirc] for information on how to
33//! proceed from there.
34//!
35//! # Design
36//!
37//! For now, this code splits the channel into two pieces: a "Channel"
38//! object that can be used by circuits to write cells onto the
39//! channel, and a "Reactor" object that runs as a task in the
40//! background, to read channel cells and pass them to circuits as
41//! appropriate.
42//!
43//! I'm not at all sure that's the best way to do that, but it's what
44//! I could think of.
45//!
46//! # Limitations
47//!
48//! This is client-only, and only supports link protocol version 4.
49//!
50//! TODO: There is no channel padding.
51//!
52//! TODO: There is no flow control, rate limiting, queueing, or
53//! fairness.
54
55/// The size of the channel buffer for communication between `Channel` and its reactor.
56pub const CHANNEL_BUFFER_SIZE: usize = 128;
57
58mod circmap;
59mod codec;
60mod handshake;
61pub mod kist;
62pub mod padding;
63pub mod params;
64mod reactor;
65mod unique_id;
66
67pub use crate::channel::params::*;
68use crate::channel::reactor::{BoxedChannelSink, BoxedChannelStream, Reactor};
69pub use crate::channel::unique_id::UniqId;
70use crate::memquota::{ChannelAccount, CircuitAccount, SpecificAccount as _};
71use crate::util::err::ChannelClosed;
72use crate::util::oneshot_broadcast;
73use crate::util::ts::AtomicOptTimestamp;
74use crate::{tunnel, tunnel::circuit, ClockSkew};
75use crate::{Error, Result};
76use reactor::BoxedChannelStreamOps;
77use safelog::sensitive as sv;
78use std::future::{Future, IntoFuture};
79use std::pin::Pin;
80use std::sync::{Mutex, MutexGuard};
81use std::time::Duration;
82use tor_cell::chancell::msg::AnyChanMsg;
83use tor_cell::chancell::{msg, msg::PaddingNegotiate, AnyChanCell, CircId};
84use tor_cell::chancell::{ChanCell, ChanMsg};
85use tor_cell::restricted_msg;
86use tor_error::internal;
87use tor_linkspec::{HasRelayIds, OwnedChanTarget};
88use tor_memquota::mq_queue::{self, ChannelSpec as _, MpscSpec};
89use tor_rtcompat::{CoarseTimeProvider, DynTimeProvider, SleepProvider, StreamOps};
90
91/// Imports that are re-exported pub if feature `testing` is enabled
92///
93/// Putting them together in a little module like this allows us to select the
94/// visibility for all of these things together.
95mod testing_exports {
96    #![allow(unreachable_pub)]
97    pub use super::reactor::CtrlMsg;
98    pub use crate::tunnel::circuit::celltypes::CreateResponse;
99}
100#[cfg(feature = "testing")]
101pub use testing_exports::*;
102#[cfg(not(feature = "testing"))]
103use testing_exports::*;
104
105use asynchronous_codec;
106use futures::channel::mpsc;
107use futures::io::{AsyncRead, AsyncWrite};
108use oneshot_fused_workaround as oneshot;
109
110use educe::Educe;
111use futures::{FutureExt as _, Sink};
112use std::result::Result as StdResult;
113use std::sync::Arc;
114use std::task::{Context, Poll};
115
116use tracing::trace;
117
118// reexport
119use crate::channel::unique_id::CircUniqIdContext;
120#[cfg(test)]
121pub(crate) use codec::CodecError;
122pub use handshake::{OutboundClientHandshake, UnverifiedChannel, VerifiedChannel};
123
124use kist::KistParams;
125
126restricted_msg! {
127    /// A channel message that we allow to be sent from a server to a client on
128    /// an open channel.
129    ///
130    /// (An Open channel here is one on which we have received a NETINFO cell.)
131    ///
132    /// Note that an unexpected message type will _not_ be ignored: instead, it
133    /// will cause the channel to shut down.
134    #[derive(Clone, Debug)]
135    pub(crate) enum OpenChanMsgS2C : ChanMsg {
136        Padding,
137        Vpadding,
138        // Not Create*, since we are not a relay.
139        // Not Created, since we never send CREATE.
140        CreatedFast,
141        Created2,
142        Relay,
143        // Not RelayEarly, since we are a client.
144        Destroy,
145        // Not PaddingNegotiate, since we are not a relay.
146        // Not Versions, Certs, AuthChallenge, Authenticate: they are for handshakes.
147        // Not Authorize: it is reserved, but unused.
148    }
149}
150
151/// A channel cell that we allot to be sent on an open channel from
152/// a server to a client.
153pub(crate) type OpenChanCellS2C = ChanCell<OpenChanMsgS2C>;
154
155/// Type alias: A Sink and Stream that transforms a TLS connection into
156/// a cell-based communication mechanism.
157type CellFrame<T> =
158    asynchronous_codec::Framed<T, crate::channel::codec::ChannelCodec<OpenChanMsgS2C, AnyChanMsg>>;
159
160/// An open client channel, ready to send and receive Tor cells.
161///
162/// A channel is a direct connection to a Tor relay, implemented using TLS.
163///
164/// This struct is a frontend that can be used to send cells
165/// and otherwise control the channel.  The main state is
166/// in the Reactor object.
167///
168/// (Users need a mutable reference because of the types in `Sink`, and
169/// ultimately because `cell_tx: mpsc::Sender` doesn't work without mut.
170///
171/// # Channel life cycle
172///
173/// Channels can be created directly here through the [`ChannelBuilder`] API.
174/// For a higher-level API (with better support for TLS, pluggable transports,
175/// and channel reuse) see the `tor-chanmgr` crate.
176///
177/// After a channel is created, it will persist until it is closed in one of
178/// four ways:
179///    1. A remote error occurs.
180///    2. The other side of the channel closes the channel.
181///    3. Someone calls [`Channel::terminate`] on the channel.
182///    4. The last reference to the `Channel` is dropped. (Note that every circuit
183///       on a `Channel` keeps a reference to it, which will in turn keep the
184///       channel from closing until all those circuits have gone away.)
185///
186/// Note that in cases 1-3, the [`Channel`] object itself will still exist: it
187/// will just be unusable for most purposes.  Most operations on it will fail
188/// with an error.
189#[derive(Debug)]
190pub struct Channel {
191    /// A channel used to send control messages to the Reactor.
192    control: mpsc::UnboundedSender<CtrlMsg>,
193    /// A channel used to send cells to the Reactor.
194    cell_tx: mq_queue::Sender<AnyChanCell, mq_queue::MpscSpec>,
195
196    /// A receiver that indicates whether the channel is closed.
197    ///
198    /// Awaiting will return a `CancelledError` event when the reactor is dropped.
199    /// Read to decide if operations may succeed, and is returned by `wait_for_close`.
200    reactor_closed_rx: oneshot_broadcast::Receiver<Result<CloseInfo>>,
201
202    /// A unique identifier for this channel.
203    unique_id: UniqId,
204    /// Validated identity and address information for this peer.
205    peer_id: OwnedChanTarget,
206    /// The declared clock skew on this channel, at the time when this channel was
207    /// created.
208    clock_skew: ClockSkew,
209    /// The time when this channel was successfully completed
210    opened_at: coarsetime::Instant,
211    /// Mutable state used by the `Channel.
212    mutable: Mutex<MutableDetails>,
213
214    /// Information shared with the reactor
215    details: Arc<ChannelDetails>,
216}
217
218/// This is information shared between the reactor and the frontend (`Channel` object).
219///
220/// `control` can't be here because we rely on it getting dropped when the last user goes away.
221#[derive(Debug)]
222pub(crate) struct ChannelDetails {
223    /// Since when the channel became unused.
224    ///
225    /// If calling `time_since_update` returns None,
226    /// this channel is still in use by at least one circuit.
227    ///
228    /// Set by reactor when a circuit is added or removed.
229    /// Read from `Channel::duration_unused`.
230    unused_since: AtomicOptTimestamp,
231    /// Memory quota account
232    ///
233    /// This is here partly because we need to ensure it lives as long as the channel,
234    /// as otherwise the memquota system will tear the account down.
235    #[allow(dead_code)]
236    memquota: ChannelAccount,
237}
238
239/// Mutable details (state) used by the `Channel` (frontend)
240#[derive(Debug, Default)]
241struct MutableDetails {
242    /// State used to control padding
243    padding: PaddingControlState,
244}
245
246/// State used to control padding
247///
248/// We store this here because:
249///
250///  1. It must be per-channel, because it depends on channel usage.  So it can't be in
251///     (for example) `ChannelPaddingInstructionsUpdate`.
252///
253///  2. It could be in the channel manager's per-channel state but (for code flow reasons
254///     there, really) at the point at which the channel manager concludes for a pending
255///     channel that it ought to update the usage, it has relinquished the lock on its own data
256///     structure.
257///     And there is actually no need for this to be global: a per-channel lock is better than
258///     reacquiring the global one.
259///
260///  3. It doesn't want to be in the channel reactor since that's super hot.
261///
262/// See also the overview at [`tor_proto::channel::padding`](padding)
263#[derive(Debug, Educe)]
264#[educe(Default)]
265enum PaddingControlState {
266    /// No usage of this channel, so far, implies sending or negotiating channel padding.
267    ///
268    /// This means we do not send (have not sent) any `ChannelPaddingInstructionsUpdates` to the reactor,
269    /// with the following consequences:
270    ///
271    ///  * We don't enable our own padding.
272    ///  * We don't do any work to change the timeout distribution in the padding timer,
273    ///    (which is fine since this timer is not enabled).
274    ///  * We don't send any PADDING_NEGOTIATE cells.  The peer is supposed to come to the
275    ///    same conclusions as us, based on channel usage: it should also not send padding.
276    #[educe(Default)]
277    UsageDoesNotImplyPadding {
278        /// The last padding parameters (from reparameterize)
279        ///
280        /// We keep this so that we can send it if and when
281        /// this channel starts to be used in a way that implies (possibly) sending padding.
282        padding_params: ChannelPaddingInstructionsUpdates,
283    },
284
285    /// Some usage of this channel implies possibly sending channel padding
286    ///
287    /// The required padding timer, negotiation cell, etc.,
288    /// have been communicated to the reactor via a `CtrlMsg::ConfigUpdate`.
289    ///
290    /// Once we have set this variant, it remains this way forever for this channel,
291    /// (the spec speaks of channels "only used for" certain purposes not getting padding).
292    PaddingConfigured,
293}
294
295use PaddingControlState as PCS;
296
297/// A handle to a [`Channel`]` that can be used, by circuits, to send channel cells.
298#[derive(Debug)]
299pub(crate) struct ChannelSender {
300    /// MPSC sender to send cells.
301    cell_tx: mq_queue::Sender<AnyChanCell, mq_queue::MpscSpec>,
302    /// A receiver used to check if the channel is closed.
303    reactor_closed_rx: oneshot_broadcast::Receiver<Result<CloseInfo>>,
304    /// Unique ID for this channel. For logging.
305    unique_id: UniqId,
306}
307
308impl ChannelSender {
309    /// Check whether a cell type is permissible to be _sent_ on an
310    /// open client channel.
311    fn check_cell(&self, cell: &AnyChanCell) -> Result<()> {
312        use msg::AnyChanMsg::*;
313        let msg = cell.msg();
314        match msg {
315            Created(_) | Created2(_) | CreatedFast(_) => Err(Error::from(internal!(
316                "Can't send {} cell on client channel",
317                msg.cmd()
318            ))),
319            Certs(_) | Versions(_) | Authenticate(_) | Authorize(_) | AuthChallenge(_)
320            | Netinfo(_) => Err(Error::from(internal!(
321                "Can't send {} cell after handshake is done",
322                msg.cmd()
323            ))),
324            _ => Ok(()),
325        }
326    }
327
328    /// Obtain a reference to the `ChannelSender`'s [`DynTimeProvider`]
329    ///
330    /// (This can sometimes be used to avoid having to keep
331    /// a separate clone of the time provider.)
332    pub(crate) fn time_provider(&self) -> &DynTimeProvider {
333        self.cell_tx.time_provider()
334    }
335}
336
337impl Sink<AnyChanCell> for ChannelSender {
338    type Error = Error;
339
340    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
341        let this = self.get_mut();
342        Pin::new(&mut this.cell_tx)
343            .poll_ready(cx)
344            .map_err(|_| ChannelClosed.into())
345    }
346
347    fn start_send(self: Pin<&mut Self>, cell: AnyChanCell) -> Result<()> {
348        let this = self.get_mut();
349        if this.reactor_closed_rx.is_ready() {
350            return Err(ChannelClosed.into());
351        }
352        this.check_cell(&cell)?;
353        {
354            use msg::AnyChanMsg::*;
355            match cell.msg() {
356                Relay(_) | Padding(_) | Vpadding(_) => {} // too frequent to log.
357                _ => trace!(
358                    "{}: Sending {} for {}",
359                    this.unique_id,
360                    cell.msg().cmd(),
361                    CircId::get_or_zero(cell.circid())
362                ),
363            }
364        }
365
366        Pin::new(&mut this.cell_tx)
367            .start_send(cell)
368            .map_err(|_| ChannelClosed.into())
369    }
370
371    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
372        let this = self.get_mut();
373        Pin::new(&mut this.cell_tx)
374            .poll_flush(cx)
375            .map_err(|_| ChannelClosed.into())
376    }
377
378    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
379        let this = self.get_mut();
380        Pin::new(&mut this.cell_tx)
381            .poll_close(cx)
382            .map_err(|_| ChannelClosed.into())
383    }
384}
385
386/// Structure for building and launching a Tor channel.
387#[derive(Default)]
388pub struct ChannelBuilder {
389    /// If present, a description of the address we're trying to connect to,
390    /// and the way in which we are trying to connect to it.
391    ///
392    /// TODO: at some point, check this against the addresses in the netinfo
393    /// cell too.
394    target: Option<tor_linkspec::ChannelMethod>,
395}
396
397impl ChannelBuilder {
398    /// Construct a new ChannelBuilder.
399    pub fn new() -> Self {
400        ChannelBuilder::default()
401    }
402
403    /// Set the declared target method of this channel to correspond to a direct
404    /// connection to a given socket address.
405    #[deprecated(note = "use set_declared_method instead", since = "0.7.1")]
406    pub fn set_declared_addr(&mut self, target: std::net::SocketAddr) {
407        self.set_declared_method(tor_linkspec::ChannelMethod::Direct(vec![target]));
408    }
409
410    /// Set the declared target method of this channel.
411    ///
412    /// Note that nothing enforces the correctness of this method: it
413    /// doesn't have to match the real method used to create the TLS
414    /// stream.
415    pub fn set_declared_method(&mut self, target: tor_linkspec::ChannelMethod) {
416        self.target = Some(target);
417    }
418
419    /// Launch a new client handshake over a TLS stream.
420    ///
421    /// After calling this function, you'll need to call `connect()` on
422    /// the result to start the handshake.  If that succeeds, you'll have
423    /// authentication info from the relay: call `check()` on the result
424    /// to check that.  Finally, to finish the handshake, call `finish()`
425    /// on the result of _that_.
426    pub fn launch<T, S>(
427        self,
428        tls: T,
429        sleep_prov: S,
430        memquota: ChannelAccount,
431    ) -> OutboundClientHandshake<T, S>
432    where
433        T: AsyncRead + AsyncWrite + StreamOps + Send + Unpin + 'static,
434        S: CoarseTimeProvider + SleepProvider,
435    {
436        handshake::OutboundClientHandshake::new(tls, self.target, sleep_prov, memquota)
437    }
438}
439
440impl Channel {
441    /// Construct a channel and reactor.
442    ///
443    /// Internal method, called to finalize the channel when we've
444    /// sent our netinfo cell, received the peer's netinfo cell, and
445    /// we're finally ready to create circuits.
446    #[allow(clippy::too_many_arguments)] // TODO consider if we want a builder
447    fn new<S>(
448        link_protocol: u16,
449        sink: BoxedChannelSink,
450        stream: BoxedChannelStream,
451        streamops: BoxedChannelStreamOps,
452        unique_id: UniqId,
453        peer_id: OwnedChanTarget,
454        clock_skew: ClockSkew,
455        sleep_prov: S,
456        memquota: ChannelAccount,
457    ) -> Result<(Arc<Self>, reactor::Reactor<S>)>
458    where
459        S: CoarseTimeProvider + SleepProvider,
460    {
461        use circmap::{CircIdRange, CircMap};
462        let circmap = CircMap::new(CircIdRange::High);
463        let dyn_time = DynTimeProvider::new(sleep_prov.clone());
464
465        let (control_tx, control_rx) = mpsc::unbounded();
466        let (cell_tx, cell_rx) = mq_queue::MpscSpec::new(CHANNEL_BUFFER_SIZE)
467            .new_mq(dyn_time.clone(), memquota.as_raw_account())?;
468        let unused_since = AtomicOptTimestamp::new();
469        unused_since.update();
470
471        let mutable = MutableDetails::default();
472        let (reactor_closed_tx, reactor_closed_rx) = oneshot_broadcast::channel();
473
474        let details = ChannelDetails {
475            unused_since,
476            memquota,
477        };
478        let details = Arc::new(details);
479
480        let channel = Arc::new(Channel {
481            control: control_tx,
482            cell_tx,
483            reactor_closed_rx,
484            unique_id,
485            peer_id,
486            clock_skew,
487            opened_at: coarsetime::Instant::now(),
488            mutable: Mutex::new(mutable),
489            details: Arc::clone(&details),
490        });
491
492        // We start disabled; the channel manager will `reconfigure` us soon after creation.
493        let padding_timer = Box::pin(padding::Timer::new_disabled(sleep_prov, None)?);
494
495        let reactor = Reactor {
496            control: control_rx,
497            cells: cell_rx,
498            reactor_closed_tx,
499            input: futures::StreamExt::fuse(stream),
500            output: sink,
501            streamops,
502            circs: circmap,
503            circ_unique_id_ctx: CircUniqIdContext::new(),
504            link_protocol,
505            unique_id,
506            details,
507            padding_timer,
508            special_outgoing: Default::default(),
509        };
510
511        Ok((channel, reactor))
512    }
513
514    /// Return a process-unique identifier for this channel.
515    pub fn unique_id(&self) -> UniqId {
516        self.unique_id
517    }
518
519    /// Return a reference to the memory tracking account for this Channel
520    pub fn mq_account(&self) -> &ChannelAccount {
521        &self.details.memquota
522    }
523
524    /// Obtain a reference to the `Channel`'s [`DynTimeProvider`]
525    ///
526    /// (This can sometimes be used to avoid having to keep
527    /// a separate clone of the time provider.)
528    pub fn time_provider(&self) -> &DynTimeProvider {
529        self.cell_tx.time_provider()
530    }
531
532    /// Return an OwnedChanTarget representing the actual handshake used to
533    /// create this channel.
534    pub fn target(&self) -> &OwnedChanTarget {
535        &self.peer_id
536    }
537
538    /// Return the amount of time that has passed since this channel became open.
539    pub fn age(&self) -> Duration {
540        self.opened_at.elapsed().into()
541    }
542
543    /// Return a ClockSkew declaring how much clock skew the other side of this channel
544    /// claimed that we had when we negotiated the connection.
545    pub fn clock_skew(&self) -> ClockSkew {
546        self.clock_skew
547    }
548
549    /// Send a control message
550    fn send_control(&self, msg: CtrlMsg) -> StdResult<(), ChannelClosed> {
551        self.control
552            .unbounded_send(msg)
553            .map_err(|_| ChannelClosed)?;
554        Ok(())
555    }
556
557    /// Acquire the lock on `mutable` (and handle any poison error)
558    fn mutable(&self) -> MutexGuard<MutableDetails> {
559        self.mutable.lock().expect("channel details poisoned")
560    }
561
562    /// Specify that this channel should do activities related to channel padding
563    ///
564    /// Initially, the channel does nothing related to channel padding:
565    /// it neither sends any padding, nor sends any PADDING_NEGOTIATE cells.
566    ///
567    /// After this function has been called, it will do both,
568    /// according to the parameters specified through `reparameterize`.
569    /// Note that this might include *disabling* padding
570    /// (for example, by sending a `PADDING_NEGOTIATE`).
571    ///
572    /// Idempotent.
573    ///
574    /// There is no way to undo the effect of this call.
575    pub fn engage_padding_activities(&self) {
576        let mut mutable = self.mutable();
577
578        match &mutable.padding {
579            PCS::UsageDoesNotImplyPadding {
580                padding_params: params,
581            } => {
582                // Well, apparently the channel usage *does* imply padding now,
583                // so we need to (belatedly) enable the timer,
584                // send the padding negotiation cell, etc.
585                let mut params = params.clone();
586
587                // Except, maybe the padding we would be requesting is precisely default,
588                // so we wouldn't actually want to send that cell.
589                if params.padding_negotiate == Some(PaddingNegotiate::start_default()) {
590                    params.padding_negotiate = None;
591                }
592
593                match self.send_control(CtrlMsg::ConfigUpdate(Arc::new(params))) {
594                    Ok(()) => {}
595                    Err(ChannelClosed) => return,
596                }
597
598                mutable.padding = PCS::PaddingConfigured;
599            }
600
601            PCS::PaddingConfigured => {
602                // OK, nothing to do
603            }
604        }
605
606        drop(mutable); // release the lock now: lock span covers the send, ensuring ordering
607    }
608
609    /// Reparameterise (update parameters; reconfigure)
610    ///
611    /// Returns `Err` if the channel was closed earlier
612    pub fn reparameterize(&self, params: Arc<ChannelPaddingInstructionsUpdates>) -> Result<()> {
613        let mut mutable = self
614            .mutable
615            .lock()
616            .map_err(|_| internal!("channel details poisoned"))?;
617
618        match &mut mutable.padding {
619            PCS::PaddingConfigured => {
620                self.send_control(CtrlMsg::ConfigUpdate(params))?;
621            }
622            PCS::UsageDoesNotImplyPadding { padding_params } => {
623                padding_params.combine(&params);
624            }
625        }
626
627        drop(mutable); // release the lock now: lock span covers the send, ensuring ordering
628        Ok(())
629    }
630
631    /// Update the KIST parameters.
632    ///
633    /// Returns `Err` if the channel is closed.
634    pub fn reparameterize_kist(&self, kist_params: KistParams) -> Result<()> {
635        Ok(self.send_control(CtrlMsg::KistConfigUpdate(kist_params))?)
636    }
637
638    /// Return an error if this channel is somehow mismatched with the
639    /// given target.
640    pub fn check_match<T: HasRelayIds + ?Sized>(&self, target: &T) -> Result<()> {
641        check_id_match_helper(&self.peer_id, target)
642    }
643
644    /// Return true if this channel is closed and therefore unusable.
645    pub fn is_closing(&self) -> bool {
646        self.reactor_closed_rx.is_ready()
647    }
648
649    /// If the channel is not in use, return the amount of time
650    /// it has had with no circuits.
651    ///
652    /// Return `None` if the channel is currently in use.
653    pub fn duration_unused(&self) -> Option<std::time::Duration> {
654        self.details
655            .unused_since
656            .time_since_update()
657            .map(Into::into)
658    }
659
660    /// Return a new [`ChannelSender`] to transmit cells on this channel.
661    pub(crate) fn sender(&self) -> ChannelSender {
662        ChannelSender {
663            cell_tx: self.cell_tx.clone(),
664            reactor_closed_rx: self.reactor_closed_rx.clone(),
665            unique_id: self.unique_id,
666        }
667    }
668
669    /// Return a newly allocated PendingClientCirc object with
670    /// a corresponding circuit reactor. A circuit ID is allocated, but no
671    /// messages are sent, and no cryptography is done.
672    ///
673    /// To use the results of this method, call Reactor::run() in a
674    /// new task, then use the methods of
675    /// [crate::tunnel::circuit::PendingClientCirc] to build the circuit.
676    pub async fn new_circ(
677        self: &Arc<Self>,
678    ) -> Result<(circuit::PendingClientCirc, tunnel::reactor::Reactor)> {
679        if self.is_closing() {
680            return Err(ChannelClosed.into());
681        }
682
683        let time_prov = self.cell_tx.time_provider().clone();
684        let memquota = CircuitAccount::new(&self.details.memquota)?;
685
686        // TODO: blocking is risky, but so is unbounded.
687        let (sender, receiver) =
688            MpscSpec::new(128).new_mq(time_prov.clone(), memquota.as_raw_account())?;
689        let (createdsender, createdreceiver) = oneshot::channel::<CreateResponse>();
690
691        let (tx, rx) = oneshot::channel();
692        self.send_control(CtrlMsg::AllocateCircuit {
693            created_sender: createdsender,
694            sender,
695            tx,
696        })?;
697        let (id, circ_unique_id) = rx.await.map_err(|_| ChannelClosed)??;
698
699        trace!("{}: Allocated CircId {}", circ_unique_id, id);
700
701        Ok(circuit::PendingClientCirc::new(
702            id,
703            self.clone(),
704            createdreceiver,
705            receiver,
706            circ_unique_id,
707            time_prov,
708            memquota,
709        ))
710    }
711
712    /// Shut down this channel immediately, along with all circuits that
713    /// are using it.
714    ///
715    /// Note that other references to this channel may exist.  If they
716    /// do, they will stop working after you call this function.
717    ///
718    /// It's not necessary to call this method if you're just done
719    /// with a channel: the channel should close on its own once nothing
720    /// is using it any more.
721    pub fn terminate(&self) {
722        let _ = self.send_control(CtrlMsg::Shutdown);
723    }
724
725    /// Tell the reactor that the circuit with the given ID has gone away.
726    pub fn close_circuit(&self, circid: CircId) -> Result<()> {
727        self.send_control(CtrlMsg::CloseCircuit(circid))?;
728        Ok(())
729    }
730
731    /// Return a future that will resolve once this channel has closed.
732    ///
733    /// Note that this method does not _cause_ the channel to shut down on its own.
734    pub fn wait_for_close(
735        &self,
736    ) -> impl Future<Output = StdResult<CloseInfo, ClosedUnexpectedly>> + Send + Sync + 'static
737    {
738        self.reactor_closed_rx
739            .clone()
740            .into_future()
741            .map(|recv| match recv {
742                Ok(Ok(info)) => Ok(info),
743                Ok(Err(e)) => Err(ClosedUnexpectedly::ReactorError(e)),
744                Err(oneshot_broadcast::SenderDropped) => Err(ClosedUnexpectedly::ReactorDropped),
745            })
746    }
747
748    /// Make a new fake reactor-less channel.  For testing only, obviously.
749    ///
750    /// Returns the receiver end of the control message mpsc.
751    ///
752    /// Suitable for external callers who want to test behaviour
753    /// of layers including the logic in the channel frontend
754    /// (`Channel` object methods).
755    //
756    // This differs from test::fake_channel as follows:
757    //  * It returns the mpsc Receiver
758    //  * It does not require explicit specification of details
759    #[cfg(feature = "testing")]
760    pub fn new_fake() -> (Channel, mpsc::UnboundedReceiver<CtrlMsg>) {
761        let (control, control_recv) = mpsc::unbounded();
762        let details = fake_channel_details();
763
764        let unique_id = UniqId::new();
765        let peer_id = OwnedChanTarget::builder()
766            .ed_identity([6_u8; 32].into())
767            .rsa_identity([10_u8; 20].into())
768            .build()
769            .expect("Couldn't construct peer id");
770
771        // This will make rx trigger immediately.
772        let (_tx, rx) = oneshot_broadcast::channel();
773
774        let channel = Channel {
775            control,
776            cell_tx: fake_mpsc().0,
777            reactor_closed_rx: rx,
778            unique_id,
779            peer_id,
780            clock_skew: ClockSkew::None,
781            opened_at: coarsetime::Instant::now(),
782            mutable: Default::default(),
783            details,
784        };
785        (channel, control_recv)
786    }
787}
788
789/// If there is any identity in `wanted_ident` that is not present in
790/// `my_ident`, return a ChanMismatch error.
791///
792/// This is a helper for [`Channel::check_match`] and
793/// [`UnverifiedChannel::check_internal`].
794fn check_id_match_helper<T, U>(my_ident: &T, wanted_ident: &U) -> Result<()>
795where
796    T: HasRelayIds + ?Sized,
797    U: HasRelayIds + ?Sized,
798{
799    for desired in wanted_ident.identities() {
800        let id_type = desired.id_type();
801        match my_ident.identity(id_type) {
802            Some(actual) if actual == desired => {}
803            Some(actual) => {
804                return Err(Error::ChanMismatch(format!(
805                    "Identity {} does not match target {}",
806                    sv(actual),
807                    sv(desired)
808                )));
809            }
810            None => {
811                return Err(Error::ChanMismatch(format!(
812                    "Peer does not have {} identity",
813                    id_type
814                )))
815            }
816        }
817    }
818    Ok(())
819}
820
821impl HasRelayIds for Channel {
822    fn identity(
823        &self,
824        key_type: tor_linkspec::RelayIdType,
825    ) -> Option<tor_linkspec::RelayIdRef<'_>> {
826        self.peer_id.identity(key_type)
827    }
828}
829
830/// The status of a channel which was closed successfully.
831///
832/// **Note:** This doesn't have any associated data,
833/// but may be expanded in the future.
834// I can't think of any info we'd want to return to waiters,
835// but this type leaves the possibility open without requiring any backwards-incompatible changes.
836#[derive(Clone, Debug)]
837#[non_exhaustive]
838pub struct CloseInfo;
839
840/// The status of a channel which closed unexpectedly.
841#[derive(Clone, Debug, thiserror::Error)]
842#[non_exhaustive]
843pub enum ClosedUnexpectedly {
844    /// The channel reactor was dropped or panicked before completing.
845    #[error("channel reactor was dropped or panicked before completing")]
846    ReactorDropped,
847    /// The channel reactor had an internal error.
848    #[error("channel reactor had an internal error")]
849    ReactorError(Error),
850}
851
852/// Make some fake channel details (for testing only!)
853#[cfg(any(test, feature = "testing"))]
854fn fake_channel_details() -> Arc<ChannelDetails> {
855    let unused_since = AtomicOptTimestamp::new();
856
857    Arc::new(ChannelDetails {
858        unused_since,
859        memquota: crate::util::fake_mq(),
860    })
861}
862
863/// Make an MPSC queue, of the type we use in Channels, but a fake one for testing
864#[cfg(any(test, feature = "testing"))] // Used by Channel::new_fake which is also feature=testing
865pub(crate) fn fake_mpsc() -> (
866    mq_queue::Sender<AnyChanCell, mq_queue::MpscSpec>,
867    mq_queue::Receiver<AnyChanCell, mq_queue::MpscSpec>,
868) {
869    crate::fake_mpsc(CHANNEL_BUFFER_SIZE)
870}
871
872#[cfg(test)]
873pub(crate) mod test {
874    // Most of this module is tested via tests that also check on the
875    // reactor code; there are just a few more cases to examine here.
876    #![allow(clippy::unwrap_used)]
877    use super::*;
878    use crate::channel::codec::test::MsgBuf;
879    pub(crate) use crate::channel::reactor::test::new_reactor;
880    use crate::util::fake_mq;
881    use tor_cell::chancell::msg::HandshakeType;
882    use tor_cell::chancell::{msg, AnyChanCell};
883    use tor_rtcompat::PreferredRuntime;
884
885    /// Make a new fake reactor-less channel.  For testing only, obviously.
886    pub(crate) fn fake_channel(details: Arc<ChannelDetails>) -> Channel {
887        let unique_id = UniqId::new();
888        let peer_id = OwnedChanTarget::builder()
889            .ed_identity([6_u8; 32].into())
890            .rsa_identity([10_u8; 20].into())
891            .build()
892            .expect("Couldn't construct peer id");
893        // This will make rx trigger immediately.
894        let (_tx, rx) = oneshot_broadcast::channel();
895        Channel {
896            control: mpsc::unbounded().0,
897            cell_tx: fake_mpsc().0,
898            reactor_closed_rx: rx,
899            unique_id,
900            peer_id,
901            clock_skew: ClockSkew::None,
902            opened_at: coarsetime::Instant::now(),
903            mutable: Default::default(),
904            details,
905        }
906    }
907
908    #[test]
909    fn send_bad() {
910        tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
911            use std::error::Error;
912            let chan = fake_channel(fake_channel_details());
913
914            let cell = AnyChanCell::new(CircId::new(7), msg::Created2::new(&b"hihi"[..]).into());
915            let e = chan.sender().check_cell(&cell);
916            assert!(e.is_err());
917            assert!(format!("{}", e.unwrap_err().source().unwrap())
918                .contains("Can't send CREATED2 cell on client channel"));
919            let cell = AnyChanCell::new(None, msg::Certs::new_empty().into());
920            let e = chan.sender().check_cell(&cell);
921            assert!(e.is_err());
922            assert!(format!("{}", e.unwrap_err().source().unwrap())
923                .contains("Can't send CERTS cell after handshake is done"));
924
925            let cell = AnyChanCell::new(
926                CircId::new(5),
927                msg::Create2::new(HandshakeType::NTOR, &b"abc"[..]).into(),
928            );
929            let e = chan.sender().check_cell(&cell);
930            assert!(e.is_ok());
931            // FIXME(eta): more difficult to test that sending works now that it has to go via reactor
932            // let got = output.next().await.unwrap();
933            // assert!(matches!(got.msg(), ChanMsg::Create2(_)));
934        });
935    }
936
937    #[test]
938    fn chanbuilder() {
939        let rt = PreferredRuntime::create().unwrap();
940        let mut builder = ChannelBuilder::default();
941        builder.set_declared_method(tor_linkspec::ChannelMethod::Direct(vec!["127.0.0.1:9001"
942            .parse()
943            .unwrap()]));
944        let tls = MsgBuf::new(&b""[..]);
945        let _outbound = builder.launch(tls, rt, fake_mq());
946    }
947
948    #[test]
949    fn check_match() {
950        let chan = fake_channel(fake_channel_details());
951
952        let t1 = OwnedChanTarget::builder()
953            .ed_identity([6; 32].into())
954            .rsa_identity([10; 20].into())
955            .build()
956            .unwrap();
957        let t2 = OwnedChanTarget::builder()
958            .ed_identity([1; 32].into())
959            .rsa_identity([3; 20].into())
960            .build()
961            .unwrap();
962        let t3 = OwnedChanTarget::builder()
963            .ed_identity([3; 32].into())
964            .rsa_identity([2; 20].into())
965            .build()
966            .unwrap();
967
968        assert!(chan.check_match(&t1).is_ok());
969        assert!(chan.check_match(&t2).is_err());
970        assert!(chan.check_match(&t3).is_err());
971    }
972
973    #[test]
974    fn unique_id() {
975        let ch1 = fake_channel(fake_channel_details());
976        let ch2 = fake_channel(fake_channel_details());
977        assert_ne!(ch1.unique_id(), ch2.unique_id());
978    }
979
980    #[test]
981    fn duration_unused_at() {
982        let details = fake_channel_details();
983        let ch = fake_channel(Arc::clone(&details));
984        details.unused_since.update();
985        assert!(ch.duration_unused().is_some());
986    }
987}