1
//! Code for talking directly (over a TLS connection) to a Tor client or relay.
2
//!
3
//! Channels form the basis of the rest of the Tor protocol: they are
4
//! the only way for two Tor instances to talk.
5
//!
6
//! Channels are not useful directly for application requests: after
7
//! making a channel, it needs to get used to build circuits, and the
8
//! circuits are used to anonymize streams.  The streams are the
9
//! objects corresponding to directory requests.
10
//!
11
//! In general, you shouldn't try to manage channels on your own;
12
//! use the `tor-chanmgr` crate instead.
13
//!
14
//! To launch a channel:
15
//!
16
//!  * Create a TLS connection as an object that implements AsyncRead +
17
//!    AsyncWrite + StreamOps, and pass it to a [ChannelBuilder].  This will
18
//!    yield an [handshake::ClientInitiatorHandshake] that represents
19
//!    the state of the handshake.
20
//!  * Call [handshake::ClientInitiatorHandshake::connect] on the result
21
//!    to negotiate the rest of the handshake.  This will verify
22
//!    syntactic correctness of the handshake, but not its cryptographic
23
//!    integrity.
24
//!  * Call [handshake::UnverifiedChannel::check] on the result.  This
25
//!    finishes the cryptographic checks.
26
//!  * Call [handshake::VerifiedChannel::finish] on the result. This
27
//!    completes the handshake and produces an open channel and Reactor.
28
//!  * Launch an asynchronous task to call the reactor's run() method.
29
//!
30
//! One you have a running channel, you can create circuits on it with
31
//! its [Channel::new_tunnel] method.  See
32
//! [crate::client::circuit::PendingClientTunnel] for information on how to
33
//! proceed from there.
34
//!
35
//! # Design
36
//!
37
//! For now, this code splits the channel into two pieces: a "Channel"
38
//! object that can be used by circuits to write cells onto the
39
//! channel, and a "Reactor" object that runs as a task in the
40
//! background, to read channel cells and pass them to circuits as
41
//! appropriate.
42
//!
43
//! I'm not at all sure that's the best way to do that, but it's what
44
//! I could think of.
45
//!
46
//! # Limitations
47
//!
48
//! TODO: There is no rate limiting or fairness.
49

            
50
/// The size of the channel buffer for communication between `Channel` and its reactor.
51
pub const CHANNEL_BUFFER_SIZE: usize = 128;
52

            
53
mod circmap;
54
mod handler;
55
mod handshake;
56
pub mod kist;
57
mod msg;
58
pub mod padding;
59
pub mod params;
60
mod reactor;
61
mod unique_id;
62

            
63
pub use crate::channel::params::*;
64
use crate::channel::reactor::{BoxedChannelSink, BoxedChannelStream, Reactor};
65
pub use crate::channel::unique_id::UniqId;
66
use crate::client::circuit::padding::{PaddingController, QueuedCellPaddingInfo};
67
use crate::client::circuit::{PendingClientTunnel, TimeoutEstimator};
68
use crate::memquota::{ChannelAccount, CircuitAccount, SpecificAccount as _};
69
use crate::util::err::ChannelClosed;
70
use crate::util::oneshot_broadcast;
71
use crate::util::ts::AtomicOptTimestamp;
72
use crate::{ClockSkew, client};
73
use crate::{Error, Result};
74
use cfg_if::cfg_if;
75
use reactor::BoxedChannelStreamOps;
76
use safelog::sensitive as sv;
77
use std::future::{Future, IntoFuture};
78
use std::pin::Pin;
79
use std::sync::{Mutex, MutexGuard};
80
use std::time::Duration;
81
use tor_cell::chancell::ChanMsg;
82
use tor_cell::chancell::msg::AnyChanMsg;
83
use tor_cell::chancell::{AnyChanCell, CircId, msg::PaddingNegotiate};
84
use tor_cell::restricted_msg;
85
use tor_error::internal;
86
use tor_linkspec::{HasRelayIds, OwnedChanTarget};
87
use tor_memquota::mq_queue::{self, ChannelSpec as _, MpscSpec};
88
use tor_rtcompat::{CoarseTimeProvider, DynTimeProvider, SleepProvider, StreamOps};
89

            
90
#[cfg(feature = "circ-padding")]
91
use tor_async_utils::counting_streams::{self, CountingSink, CountingStream};
92

            
93
/// Imports that are re-exported pub if feature `testing` is enabled
94
///
95
/// Putting them together in a little module like this allows us to select the
96
/// visibility for all of these things together.
97
mod testing_exports {
98
    #![allow(unreachable_pub)]
99
    pub use super::reactor::CtrlMsg;
100
    pub use crate::client::circuit::celltypes::CreateResponse;
101
}
102
#[cfg(feature = "testing")]
103
pub use testing_exports::*;
104
#[cfg(not(feature = "testing"))]
105
use testing_exports::*;
106

            
107
use asynchronous_codec;
108
use futures::channel::mpsc;
109
use futures::io::{AsyncRead, AsyncWrite};
110
use oneshot_fused_workaround as oneshot;
111

            
112
use educe::Educe;
113
use futures::{FutureExt as _, Sink};
114
use std::result::Result as StdResult;
115
use std::sync::Arc;
116
use std::task::{Context, Poll};
117

            
118
use tracing::trace;
119

            
120
// reexport
121
use crate::channel::unique_id::CircUniqIdContext;
122
pub use handshake::{ClientInitiatorHandshake, UnverifiedChannel, VerifiedChannel};
123

            
124
use kist::KistParams;
125

            
126
restricted_msg! {
127
    /// A channel message that we allow to be sent from a server to a client on
128
    /// an open channel.
129
    ///
130
    /// (An Open channel here is one on which we have received a NETINFO cell.)
131
    ///
132
    /// Note that an unexpected message type will _not_ be ignored: instead, it
133
    /// will cause the channel to shut down.
134
    #[derive(Clone, Debug)]
135
    pub(crate) enum OpenChanMsgS2C : ChanMsg {
136
        Padding,
137
        Vpadding,
138
        // Not Create*, since we are not a relay.
139
        // Not Created, since we never send CREATE.
140
        CreatedFast,
141
        Created2,
142
        Relay,
143
        // Not RelayEarly, since we are a client.
144
        Destroy,
145
        // Not PaddingNegotiate, since we are not a relay.
146
        // Not Versions, Certs, AuthChallenge, Authenticate: they are for handshakes.
147
        // Not Authorize: it is reserved, but unused.
148
    }
149
}
150

            
151
/// This indicate what type of channel it is. It allows us to decide for the correct channel cell
152
/// state machines and authentication process (if any).
153
///
154
/// It is created when a channel is requested for creation which means the subsystem wanting to
155
/// open a channel needs to know what type it wants.
156
#[derive(Clone, Copy, Debug, derive_more::Display)]
157
#[non_exhaustive]
158
pub enum ChannelType {
159
    /// Client: Initiated from a client to a relay. Client is unauthenticated and relay is
160
    /// authenticated.
161
    ClientInitiator,
162
    /// Relay: Initiating as a relay to a relay. Both sides are authenticated.
163
    RelayInitiator,
164
    /// Relay: Responding as a relay to a relay or client. Authenticated or Unauthenticated.
165
    RelayResponder {
166
        /// Indicate if the channel is authenticated. Responding as a relay can be either from a
167
        /// Relay (authenticated) or a Client/Bridge (Unauthenticated). We only know this
168
        /// information once the handshake is completed.
169
        ///
170
        /// This side is always authenticated, the other side can be if a relay or not if
171
        /// bridge/client. This is set to false unless we end up authenticating the other side
172
        /// meaning a relay.
173
        authenticated: bool,
174
    },
175
}
176

            
177
impl ChannelType {
178
    /// Return true if this channel type is an initiator.
179
    #[expect(unused)] // TODO: Remove once used.
180
    pub(crate) fn is_initiator(&self) -> bool {
181
        matches!(self, Self::ClientInitiator | Self::RelayInitiator)
182
    }
183
}
184

            
185
/// A channel cell frame used for sending and receiving cells on a channel. The handler takes care
186
/// of the cell codec transition depending in which state the channel is.
187
///
188
/// ChannelFrame is used to basically handle all in and outbound cells on a channel for its entire
189
/// lifetime.
190
pub(crate) type ChannelFrame<T> = asynchronous_codec::Framed<T, handler::ChannelCellHandler>;
191

            
192
/// An entry in a channel's queue of cells to be flushed.
193
pub(crate) type ChanCellQueueEntry = (AnyChanCell, Option<QueuedCellPaddingInfo>);
194

            
195
/// Helper: Return a new channel frame [ChannelFrame] from an object implementing AsyncRead + AsyncWrite. In the
196
/// tor context, it is always a TLS stream.
197
///
198
/// The ty (type) argument needs to be able to transform into a [handler::ChannelCellHandler] which would
199
/// generally be a [ChannelType].
200
74
pub(crate) fn new_frame<T, I>(tls: T, ty: I) -> ChannelFrame<T>
201
74
where
202
74
    T: AsyncRead + AsyncWrite,
203
74
    I: Into<handler::ChannelCellHandler>,
204
{
205
74
    asynchronous_codec::Framed::new(tls, ty.into())
206
74
}
207

            
208
/// An open client channel, ready to send and receive Tor cells.
209
///
210
/// A channel is a direct connection to a Tor relay, implemented using TLS.
211
///
212
/// This struct is a frontend that can be used to send cells
213
/// and otherwise control the channel.  The main state is
214
/// in the Reactor object.
215
///
216
/// (Users need a mutable reference because of the types in `Sink`, and
217
/// ultimately because `cell_tx: mpsc::Sender` doesn't work without mut.
218
///
219
/// # Channel life cycle
220
///
221
/// Channels can be created directly here through the [`ChannelBuilder`] API.
222
/// For a higher-level API (with better support for TLS, pluggable transports,
223
/// and channel reuse) see the `tor-chanmgr` crate.
224
///
225
/// After a channel is created, it will persist until it is closed in one of
226
/// four ways:
227
///    1. A remote error occurs.
228
///    2. The other side of the channel closes the channel.
229
///    3. Someone calls [`Channel::terminate`] on the channel.
230
///    4. The last reference to the `Channel` is dropped. (Note that every circuit
231
///       on a `Channel` keeps a reference to it, which will in turn keep the
232
///       channel from closing until all those circuits have gone away.)
233
///
234
/// Note that in cases 1-3, the [`Channel`] object itself will still exist: it
235
/// will just be unusable for most purposes.  Most operations on it will fail
236
/// with an error.
237
#[derive(Debug)]
238
pub struct Channel {
239
    /// The channel type.
240
    #[expect(unused)] // TODO: Remove once used.
241
    channel_type: ChannelType,
242
    /// A channel used to send control messages to the Reactor.
243
    control: mpsc::UnboundedSender<CtrlMsg>,
244
    /// A channel used to send cells to the Reactor.
245
    cell_tx: CellTx,
246

            
247
    /// A receiver that indicates whether the channel is closed.
248
    ///
249
    /// Awaiting will return a `CancelledError` event when the reactor is dropped.
250
    /// Read to decide if operations may succeed, and is returned by `wait_for_close`.
251
    reactor_closed_rx: oneshot_broadcast::Receiver<Result<CloseInfo>>,
252

            
253
    /// Padding controller, used to report when data is queued for this channel.
254
    padding_ctrl: PaddingController,
255

            
256
    /// A unique identifier for this channel.
257
    unique_id: UniqId,
258
    /// Validated identity and address information for this peer.
259
    peer_id: OwnedChanTarget,
260
    /// The declared clock skew on this channel, at the time when this channel was
261
    /// created.
262
    clock_skew: ClockSkew,
263
    /// The time when this channel was successfully completed
264
    opened_at: coarsetime::Instant,
265
    /// Mutable state used by the `Channel.
266
    mutable: Mutex<MutableDetails>,
267

            
268
    /// Information shared with the reactor
269
    details: Arc<ChannelDetails>,
270
}
271

            
272
/// This is information shared between the reactor and the frontend (`Channel` object).
273
///
274
/// `control` can't be here because we rely on it getting dropped when the last user goes away.
275
#[derive(Debug)]
276
pub(crate) struct ChannelDetails {
277
    /// Since when the channel became unused.
278
    ///
279
    /// If calling `time_since_update` returns None,
280
    /// this channel is still in use by at least one circuit.
281
    ///
282
    /// Set by reactor when a circuit is added or removed.
283
    /// Read from `Channel::duration_unused`.
284
    unused_since: AtomicOptTimestamp,
285
    /// Memory quota account
286
    ///
287
    /// This is here partly because we need to ensure it lives as long as the channel,
288
    /// as otherwise the memquota system will tear the account down.
289
    #[allow(dead_code)]
290
    memquota: ChannelAccount,
291
}
292

            
293
/// Mutable details (state) used by the `Channel` (frontend)
294
#[derive(Debug, Default)]
295
struct MutableDetails {
296
    /// State used to control padding
297
    padding: PaddingControlState,
298
}
299

            
300
/// State used to control padding
301
///
302
/// We store this here because:
303
///
304
///  1. It must be per-channel, because it depends on channel usage.  So it can't be in
305
///     (for example) `ChannelPaddingInstructionsUpdate`.
306
///
307
///  2. It could be in the channel manager's per-channel state but (for code flow reasons
308
///     there, really) at the point at which the channel manager concludes for a pending
309
///     channel that it ought to update the usage, it has relinquished the lock on its own data
310
///     structure.
311
///     And there is actually no need for this to be global: a per-channel lock is better than
312
///     reacquiring the global one.
313
///
314
///  3. It doesn't want to be in the channel reactor since that's super hot.
315
///
316
/// See also the overview at [`tor_proto::channel::padding`](padding)
317
#[derive(Debug, Educe)]
318
#[educe(Default)]
319
enum PaddingControlState {
320
    /// No usage of this channel, so far, implies sending or negotiating channel padding.
321
    ///
322
    /// This means we do not send (have not sent) any `ChannelPaddingInstructionsUpdates` to the reactor,
323
    /// with the following consequences:
324
    ///
325
    ///  * We don't enable our own padding.
326
    ///  * We don't do any work to change the timeout distribution in the padding timer,
327
    ///    (which is fine since this timer is not enabled).
328
    ///  * We don't send any PADDING_NEGOTIATE cells.  The peer is supposed to come to the
329
    ///    same conclusions as us, based on channel usage: it should also not send padding.
330
    #[educe(Default)]
331
    UsageDoesNotImplyPadding {
332
        /// The last padding parameters (from reparameterize)
333
        ///
334
        /// We keep this so that we can send it if and when
335
        /// this channel starts to be used in a way that implies (possibly) sending padding.
336
        padding_params: ChannelPaddingInstructionsUpdates,
337
    },
338

            
339
    /// Some usage of this channel implies possibly sending channel padding
340
    ///
341
    /// The required padding timer, negotiation cell, etc.,
342
    /// have been communicated to the reactor via a `CtrlMsg::ConfigUpdate`.
343
    ///
344
    /// Once we have set this variant, it remains this way forever for this channel,
345
    /// (the spec speaks of channels "only used for" certain purposes not getting padding).
346
    PaddingConfigured,
347
}
348

            
349
use PaddingControlState as PCS;
350

            
351
cfg_if! {
352
    if #[cfg(feature="circ-padding")] {
353
        /// Implementation type for a ChannelSender.
354
        type CellTx = CountingSink<mq_queue::Sender<ChanCellQueueEntry, mq_queue::MpscSpec>>;
355

            
356
        /// Implementation type for a cell queue held by a reactor.
357
        type CellRx = CountingStream<mq_queue::Receiver<ChanCellQueueEntry, mq_queue::MpscSpec>>;
358
    } else {
359
        /// Implementation type for a ChannelSender.
360
        type CellTx = mq_queue::Sender<ChanCellQueueEntry, mq_queue::MpscSpec>;
361

            
362
        /// Implementation type for a cell queue held by a reactor.
363
        type CellRx = mq_queue::Receiver<ChanCellQueueEntry, mq_queue::MpscSpec>;
364
    }
365
}
366

            
367
/// A handle to a [`Channel`]` that can be used, by circuits, to send channel cells.
368
#[derive(Debug)]
369
pub(crate) struct ChannelSender {
370
    /// MPSC sender to send cells.
371
    cell_tx: CellTx,
372
    /// A receiver used to check if the channel is closed.
373
    reactor_closed_rx: oneshot_broadcast::Receiver<Result<CloseInfo>>,
374
    /// Unique ID for this channel. For logging.
375
    unique_id: UniqId,
376
    /// Padding controller for this channel:
377
    /// used to report when we queue data that will eventually wind up on the channel.
378
    padding_ctrl: PaddingController,
379
}
380

            
381
impl ChannelSender {
382
    /// Check whether a cell type is permissible to be _sent_ on an
383
    /// open client channel.
384
4672
    fn check_cell(&self, cell: &AnyChanCell) -> Result<()> {
385
        use tor_cell::chancell::msg::AnyChanMsg::*;
386
4672
        let msg = cell.msg();
387
4672
        match msg {
388
12
            Created(_) | Created2(_) | CreatedFast(_) => Err(Error::from(internal!(
389
12
                "Can't send {} cell on client channel",
390
12
                msg.cmd()
391
12
            ))),
392
            Certs(_) | Versions(_) | Authenticate(_) | AuthChallenge(_) | Netinfo(_) => {
393
12
                Err(Error::from(internal!(
394
12
                    "Can't send {} cell after handshake is done",
395
12
                    msg.cmd()
396
12
                )))
397
            }
398
4648
            _ => Ok(()),
399
        }
400
4672
    }
401

            
402
    /// Obtain a reference to the `ChannelSender`'s [`DynTimeProvider`]
403
    ///
404
    /// (This can sometimes be used to avoid having to keep
405
    /// a separate clone of the time provider.)
406
72
    pub(crate) fn time_provider(&self) -> &DynTimeProvider {
407
        cfg_if! {
408
            if #[cfg(feature="circ-padding")] {
409
72
                self.cell_tx.inner().time_provider()
410
            } else {
411
                self.cell_tx.time_provider()
412
            }
413
        }
414
72
    }
415

            
416
    /// Return an approximate count of the number of outbound cells queued for this channel.
417
    ///
418
    /// This count is necessarily approximate,
419
    /// because the underlying count can be modified by other senders and receivers
420
    /// between when this method is called and when its return value is used.
421
    ///
422
    /// Does not include cells that have already been passed to the TLS connection.
423
    ///
424
    /// Circuit padding uses this count to determine
425
    /// when messages are already outbound for the first hop of a circuit.
426
    #[cfg(feature = "circ-padding")]
427
    pub(crate) fn approx_count(&self) -> usize {
428
        self.cell_tx.approx_count()
429
    }
430

            
431
    /// Note that a cell has been queued that will eventually be placed onto this sender.
432
    ///
433
    /// We use this as an input for padding machines.
434
4636
    pub(crate) fn note_cell_queued(&self) {
435
4636
        self.padding_ctrl.queued_data(crate::HopNum::from(0));
436
4636
    }
437
}
438

            
439
impl Sink<ChanCellQueueEntry> for ChannelSender {
440
    type Error = Error;
441

            
442
19490
    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
443
19490
        let this = self.get_mut();
444
19490
        Pin::new(&mut this.cell_tx)
445
19490
            .poll_ready(cx)
446
19493
            .map_err(|_| ChannelClosed.into())
447
19490
    }
448

            
449
4636
    fn start_send(self: Pin<&mut Self>, cell: ChanCellQueueEntry) -> Result<()> {
450
4636
        let this = self.get_mut();
451
4636
        if this.reactor_closed_rx.is_ready() {
452
            return Err(ChannelClosed.into());
453
4636
        }
454
4636
        this.check_cell(&cell.0)?;
455
        {
456
            use tor_cell::chancell::msg::AnyChanMsg::*;
457
4636
            match cell.0.msg() {
458
4516
                Relay(_) | Padding(_) | Vpadding(_) => {} // too frequent to log.
459
120
                _ => trace!(
460
                    channel_id = %this.unique_id,
461
                    "Sending {} for {}",
462
                    cell.0.msg().cmd(),
463
                    CircId::get_or_zero(cell.0.circid())
464
                ),
465
            }
466
        }
467

            
468
4636
        Pin::new(&mut this.cell_tx)
469
4636
            .start_send(cell)
470
4636
            .map_err(|_| ChannelClosed.into())
471
4636
    }
472

            
473
48
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
474
48
        let this = self.get_mut();
475
48
        Pin::new(&mut this.cell_tx)
476
48
            .poll_flush(cx)
477
48
            .map_err(|_| ChannelClosed.into())
478
48
    }
479

            
480
    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
481
        let this = self.get_mut();
482
        Pin::new(&mut this.cell_tx)
483
            .poll_close(cx)
484
            .map_err(|_| ChannelClosed.into())
485
    }
486
}
487

            
488
/// Structure for building and launching a Tor channel.
489
#[derive(Default)]
490
pub struct ChannelBuilder {
491
    /// If present, a description of the address we're trying to connect to,
492
    /// and the way in which we are trying to connect to it.
493
    ///
494
    /// TODO: at some point, check this against the addresses in the netinfo
495
    /// cell too.
496
    target: Option<tor_linkspec::ChannelMethod>,
497
}
498

            
499
impl ChannelBuilder {
500
    /// Construct a new ChannelBuilder.
501
47
    pub fn new() -> Self {
502
47
        ChannelBuilder::default()
503
47
    }
504

            
505
    /// Set the declared target method of this channel to correspond to a direct
506
    /// connection to a given socket address.
507
    #[deprecated(note = "use set_declared_method instead", since = "0.7.1")]
508
    pub fn set_declared_addr(&mut self, target: std::net::SocketAddr) {
509
        self.set_declared_method(tor_linkspec::ChannelMethod::Direct(vec![target]));
510
    }
511

            
512
    /// Set the declared target method of this channel.
513
    ///
514
    /// Note that nothing enforces the correctness of this method: it
515
    /// doesn't have to match the real method used to create the TLS
516
    /// stream.
517
49
    pub fn set_declared_method(&mut self, target: tor_linkspec::ChannelMethod) {
518
49
        self.target = Some(target);
519
49
    }
520

            
521
    /// Launch a new client handshake over a TLS stream.
522
    ///
523
    /// After calling this function, you'll need to call `connect()` on
524
    /// the result to start the handshake.  If that succeeds, you'll have
525
    /// authentication info from the relay: call `check()` on the result
526
    /// to check that.  Finally, to finish the handshake, call `finish()`
527
    /// on the result of _that_.
528
4
    pub fn launch_client<T, S>(
529
4
        self,
530
4
        tls: T,
531
4
        sleep_prov: S,
532
4
        memquota: ChannelAccount,
533
4
    ) -> ClientInitiatorHandshake<T, S>
534
4
    where
535
4
        T: AsyncRead + AsyncWrite + StreamOps + Send + Unpin + 'static,
536
4
        S: CoarseTimeProvider + SleepProvider,
537
    {
538
4
        handshake::ClientInitiatorHandshake::new(tls, self.target, sleep_prov, memquota)
539
4
    }
540
}
541

            
542
impl Channel {
543
    /// Construct a channel and reactor.
544
    ///
545
    /// Internal method, called to finalize the channel when we've
546
    /// sent our netinfo cell, received the peer's netinfo cell, and
547
    /// we're finally ready to create circuits.
548
    ///
549
    /// Quick note on the allow clippy. This is has one call site so for now, it is fine that we
550
    /// bust the mighty 7 arguments.
551
    #[allow(clippy::too_many_arguments)] // TODO consider if we want a builder
552
476
    fn new<S>(
553
476
        channel_type: ChannelType,
554
476
        link_protocol: u16,
555
476
        sink: BoxedChannelSink,
556
476
        stream: BoxedChannelStream,
557
476
        streamops: BoxedChannelStreamOps,
558
476
        unique_id: UniqId,
559
476
        peer_id: OwnedChanTarget,
560
476
        clock_skew: ClockSkew,
561
476
        sleep_prov: S,
562
476
        memquota: ChannelAccount,
563
476
    ) -> Result<(Arc<Self>, reactor::Reactor<S>)>
564
476
    where
565
476
        S: CoarseTimeProvider + SleepProvider,
566
    {
567
        use circmap::{CircIdRange, CircMap};
568
476
        let circmap = CircMap::new(CircIdRange::High);
569
476
        let dyn_time = DynTimeProvider::new(sleep_prov.clone());
570

            
571
476
        let (control_tx, control_rx) = mpsc::unbounded();
572
476
        let (cell_tx, cell_rx) = mq_queue::MpscSpec::new(CHANNEL_BUFFER_SIZE)
573
476
            .new_mq(dyn_time.clone(), memquota.as_raw_account())?;
574
        #[cfg(feature = "circ-padding")]
575
476
        let (cell_tx, cell_rx) = counting_streams::channel(cell_tx, cell_rx);
576
476
        let unused_since = AtomicOptTimestamp::new();
577
476
        unused_since.update();
578

            
579
476
        let mutable = MutableDetails::default();
580
476
        let (reactor_closed_tx, reactor_closed_rx) = oneshot_broadcast::channel();
581

            
582
476
        let details = ChannelDetails {
583
476
            unused_since,
584
476
            memquota,
585
476
        };
586
476
        let details = Arc::new(details);
587

            
588
        // We might be using experimental maybenot padding; this creates the padding framework for that.
589
        //
590
        // TODO: This backend is currently optimized for circuit padding,
591
        // so it might allocate a bit more than necessary to account for multiple hops.
592
        // We should tune it when we deploy padding in production.
593
476
        let (padding_ctrl, padding_event_stream) =
594
476
            client::circuit::padding::new_padding(DynTimeProvider::new(sleep_prov.clone()));
595

            
596
476
        let channel = Arc::new(Channel {
597
476
            channel_type,
598
476
            control: control_tx,
599
476
            cell_tx,
600
476
            reactor_closed_rx,
601
476
            padding_ctrl: padding_ctrl.clone(),
602
476
            unique_id,
603
476
            peer_id,
604
476
            clock_skew,
605
476
            opened_at: coarsetime::Instant::now(),
606
476
            mutable: Mutex::new(mutable),
607
476
            details: Arc::clone(&details),
608
476
        });
609

            
610
        // We start disabled; the channel manager will `reconfigure` us soon after creation.
611
476
        let padding_timer = Box::pin(padding::Timer::new_disabled(sleep_prov.clone(), None)?);
612

            
613
        cfg_if! {
614
            if #[cfg(feature = "circ-padding")] {
615
                use crate::util::sink_blocker::{SinkBlocker,CountingPolicy};
616
476
                let sink = SinkBlocker::new(sink, CountingPolicy::new_unlimited());
617
            }
618
        }
619

            
620
476
        let reactor = Reactor {
621
476
            runtime: sleep_prov,
622
476
            control: control_rx,
623
476
            cells: cell_rx,
624
476
            reactor_closed_tx,
625
476
            input: futures::StreamExt::fuse(stream),
626
476
            output: sink,
627
476
            streamops,
628
476
            circs: circmap,
629
476
            circ_unique_id_ctx: CircUniqIdContext::new(),
630
476
            link_protocol,
631
476
            unique_id,
632
476
            details,
633
476
            padding_timer,
634
476
            padding_ctrl,
635
476
            padding_event_stream,
636
476
            padding_blocker: None,
637
476
            special_outgoing: Default::default(),
638
476
        };
639

            
640
476
        Ok((channel, reactor))
641
476
    }
642

            
643
    /// Return a process-unique identifier for this channel.
644
4
    pub fn unique_id(&self) -> UniqId {
645
4
        self.unique_id
646
4
    }
647

            
648
    /// Return a reference to the memory tracking account for this Channel
649
    pub fn mq_account(&self) -> &ChannelAccount {
650
        &self.details.memquota
651
    }
652

            
653
    /// Obtain a reference to the `Channel`'s [`DynTimeProvider`]
654
    ///
655
    /// (This can sometimes be used to avoid having to keep
656
    /// a separate clone of the time provider.)
657
388
    pub fn time_provider(&self) -> &DynTimeProvider {
658
        cfg_if! {
659
            if #[cfg(feature="circ-padding")] {
660
388
                self.cell_tx.inner().time_provider()
661
            } else {
662
                self.cell_tx.time_provider()
663
            }
664
        }
665
388
    }
666

            
667
    /// Return an OwnedChanTarget representing the actual handshake used to
668
    /// create this channel.
669
1176
    pub fn target(&self) -> &OwnedChanTarget {
670
1176
        &self.peer_id
671
1176
    }
672

            
673
    /// Return the amount of time that has passed since this channel became open.
674
    pub fn age(&self) -> Duration {
675
        self.opened_at.elapsed().into()
676
    }
677

            
678
    /// Return a ClockSkew declaring how much clock skew the other side of this channel
679
    /// claimed that we had when we negotiated the connection.
680
    pub fn clock_skew(&self) -> ClockSkew {
681
        self.clock_skew
682
    }
683

            
684
    /// Send a control message
685
2910
    fn send_control(&self, msg: CtrlMsg) -> StdResult<(), ChannelClosed> {
686
2910
        self.control
687
2910
            .unbounded_send(msg)
688
2910
            .map_err(|_| ChannelClosed)?;
689
2842
        Ok(())
690
2910
    }
691

            
692
    /// Acquire the lock on `mutable` (and handle any poison error)
693
1128
    fn mutable(&self) -> MutexGuard<MutableDetails> {
694
1128
        self.mutable.lock().expect("channel details poisoned")
695
1128
    }
696

            
697
    /// Specify that this channel should do activities related to channel padding
698
    ///
699
    /// Initially, the channel does nothing related to channel padding:
700
    /// it neither sends any padding, nor sends any PADDING_NEGOTIATE cells.
701
    ///
702
    /// After this function has been called, it will do both,
703
    /// according to the parameters specified through `reparameterize`.
704
    /// Note that this might include *disabling* padding
705
    /// (for example, by sending a `PADDING_NEGOTIATE`).
706
    ///
707
    /// Idempotent.
708
    ///
709
    /// There is no way to undo the effect of this call.
710
1128
    pub fn engage_padding_activities(&self) {
711
1128
        let mut mutable = self.mutable();
712

            
713
1128
        match &mutable.padding {
714
            PCS::UsageDoesNotImplyPadding {
715
1128
                padding_params: params,
716
            } => {
717
                // Well, apparently the channel usage *does* imply padding now,
718
                // so we need to (belatedly) enable the timer,
719
                // send the padding negotiation cell, etc.
720
1128
                let mut params = params.clone();
721

            
722
                // Except, maybe the padding we would be requesting is precisely default,
723
                // so we wouldn't actually want to send that cell.
724
1128
                if params.padding_negotiate == Some(PaddingNegotiate::start_default()) {
725
                    params.padding_negotiate = None;
726
1128
                }
727

            
728
1128
                match self.send_control(CtrlMsg::ConfigUpdate(Arc::new(params))) {
729
1128
                    Ok(()) => {}
730
                    Err(ChannelClosed) => return,
731
                }
732

            
733
1128
                mutable.padding = PCS::PaddingConfigured;
734
            }
735

            
736
            PCS::PaddingConfigured => {
737
                // OK, nothing to do
738
            }
739
        }
740

            
741
1128
        drop(mutable); // release the lock now: lock span covers the send, ensuring ordering
742
1128
    }
743

            
744
    /// Reparameterise (update parameters; reconfigure)
745
    ///
746
    /// Returns `Err` if the channel was closed earlier
747
2538
    pub fn reparameterize(&self, params: Arc<ChannelPaddingInstructionsUpdates>) -> Result<()> {
748
2538
        let mut mutable = self
749
2538
            .mutable
750
2538
            .lock()
751
2538
            .map_err(|_| internal!("channel details poisoned"))?;
752

            
753
2538
        match &mut mutable.padding {
754
            PCS::PaddingConfigured => {
755
1410
                self.send_control(CtrlMsg::ConfigUpdate(params))?;
756
            }
757
1128
            PCS::UsageDoesNotImplyPadding { padding_params } => {
758
1128
                padding_params.combine(&params);
759
1128
            }
760
        }
761

            
762
2538
        drop(mutable); // release the lock now: lock span covers the send, ensuring ordering
763
2538
        Ok(())
764
2538
    }
765

            
766
    /// Update the KIST parameters.
767
    ///
768
    /// Returns `Err` if the channel is closed.
769
    pub fn reparameterize_kist(&self, kist_params: KistParams) -> Result<()> {
770
        Ok(self.send_control(CtrlMsg::KistConfigUpdate(kist_params))?)
771
    }
772

            
773
    /// Return an error if this channel is somehow mismatched with the
774
    /// given target.
775
42
    pub fn check_match<T: HasRelayIds + ?Sized>(&self, target: &T) -> Result<()> {
776
42
        check_id_match_helper(&self.peer_id, target)
777
42
    }
778

            
779
    /// Return true if this channel is closed and therefore unusable.
780
119
    pub fn is_closing(&self) -> bool {
781
119
        self.reactor_closed_rx.is_ready()
782
119
    }
783

            
784
    /// If the channel is not in use, return the amount of time
785
    /// it has had with no circuits.
786
    ///
787
    /// Return `None` if the channel is currently in use.
788
179
    pub fn duration_unused(&self) -> Option<std::time::Duration> {
789
179
        self.details
790
179
            .unused_since
791
179
            .time_since_update()
792
179
            .map(Into::into)
793
179
    }
794

            
795
    /// Return a new [`ChannelSender`] to transmit cells on this channel.
796
412
    pub(crate) fn sender(&self) -> ChannelSender {
797
412
        ChannelSender {
798
412
            cell_tx: self.cell_tx.clone(),
799
412
            reactor_closed_rx: self.reactor_closed_rx.clone(),
800
412
            unique_id: self.unique_id,
801
412
            padding_ctrl: self.padding_ctrl.clone(),
802
412
        }
803
412
    }
804

            
805
    /// Return a newly allocated PendingClientTunnel object with
806
    /// a corresponding tunnel reactor. A circuit ID is allocated, but no
807
    /// messages are sent, and no cryptography is done.
808
    ///
809
    /// To use the results of this method, call Reactor::run() in a
810
    /// new task, then use the methods of
811
    /// [crate::client::circuit::PendingClientTunnel] to build the circuit.
812
12
    pub async fn new_tunnel(
813
12
        self: &Arc<Self>,
814
12
        timeouts: Arc<dyn TimeoutEstimator>,
815
18
    ) -> Result<(PendingClientTunnel, client::reactor::Reactor)> {
816
12
        if self.is_closing() {
817
            return Err(ChannelClosed.into());
818
12
        }
819

            
820
12
        let time_prov = self.time_provider().clone();
821
12
        let memquota = CircuitAccount::new(&self.details.memquota)?;
822

            
823
        // TODO: blocking is risky, but so is unbounded.
824
12
        let (sender, receiver) =
825
12
            MpscSpec::new(128).new_mq(time_prov.clone(), memquota.as_raw_account())?;
826
12
        let (createdsender, createdreceiver) = oneshot::channel::<CreateResponse>();
827

            
828
12
        let (tx, rx) = oneshot::channel();
829
12
        self.send_control(CtrlMsg::AllocateCircuit {
830
12
            created_sender: createdsender,
831
12
            sender,
832
12
            tx,
833
12
        })?;
834
12
        let (id, circ_unique_id, padding_ctrl, padding_stream) =
835
12
            rx.await.map_err(|_| ChannelClosed)??;
836

            
837
12
        trace!("{}: Allocated CircId {}", circ_unique_id, id);
838

            
839
12
        Ok(PendingClientTunnel::new(
840
12
            id,
841
12
            self.clone(),
842
12
            createdreceiver,
843
12
            receiver,
844
12
            circ_unique_id,
845
12
            time_prov,
846
12
            memquota,
847
12
            padding_ctrl,
848
12
            padding_stream,
849
12
            timeouts,
850
12
        ))
851
12
    }
852

            
853
    /// Shut down this channel immediately, along with all circuits that
854
    /// are using it.
855
    ///
856
    /// Note that other references to this channel may exist.  If they
857
    /// do, they will stop working after you call this function.
858
    ///
859
    /// It's not necessary to call this method if you're just done
860
    /// with a channel: the channel should close on its own once nothing
861
    /// is using it any more.
862
36
    pub fn terminate(&self) {
863
36
        let _ = self.send_control(CtrlMsg::Shutdown);
864
36
    }
865

            
866
    /// Tell the reactor that the circuit with the given ID has gone away.
867
324
    pub fn close_circuit(&self, circid: CircId) -> Result<()> {
868
324
        self.send_control(CtrlMsg::CloseCircuit(circid))?;
869
256
        Ok(())
870
324
    }
871

            
872
    /// Return a future that will resolve once this channel has closed.
873
    ///
874
    /// Note that this method does not _cause_ the channel to shut down on its own.
875
36
    pub fn wait_for_close(
876
36
        &self,
877
36
    ) -> impl Future<Output = StdResult<CloseInfo, ClosedUnexpectedly>> + Send + Sync + 'static + use<>
878
    {
879
36
        self.reactor_closed_rx
880
36
            .clone()
881
36
            .into_future()
882
48
            .map(|recv| match recv {
883
12
                Ok(Ok(info)) => Ok(info),
884
12
                Ok(Err(e)) => Err(ClosedUnexpectedly::ReactorError(e)),
885
12
                Err(oneshot_broadcast::SenderDropped) => Err(ClosedUnexpectedly::ReactorDropped),
886
36
            })
887
36
    }
888

            
889
    /// Install a [`CircuitPadder`](client::CircuitPadder) for this channel.
890
    ///
891
    /// Replaces any previous padder installed.
892
    #[cfg(feature = "circ-padding-manual")]
893
    pub async fn start_padding(self: &Arc<Self>, padder: client::CircuitPadder) -> Result<()> {
894
        self.set_padder_impl(Some(padder)).await
895
    }
896

            
897
    /// Remove any [`CircuitPadder`](client::CircuitPadder) installed for this channel.
898
    ///
899
    /// Does nothing if there was not a padder installed there.
900
    #[cfg(feature = "circ-padding-manual")]
901
    pub async fn stop_padding(self: &Arc<Self>) -> Result<()> {
902
        self.set_padder_impl(None).await
903
    }
904

            
905
    /// Replace the [`CircuitPadder`](client::CircuitPadder) installed for this channel with `padder`.
906
    #[cfg(feature = "circ-padding-manual")]
907
    async fn set_padder_impl(
908
        self: &Arc<Self>,
909
        padder: Option<client::CircuitPadder>,
910
    ) -> Result<()> {
911
        let (tx, rx) = oneshot::channel();
912
        let msg = CtrlMsg::SetChannelPadder { padder, sender: tx };
913
        self.control
914
            .unbounded_send(msg)
915
            .map_err(|_| Error::ChannelClosed(ChannelClosed))?;
916
        rx.await.map_err(|_| Error::ChannelClosed(ChannelClosed))?
917
    }
918

            
919
    /// Make a new fake reactor-less channel.  For testing only, obviously.
920
    ///
921
    /// Returns the receiver end of the control message mpsc.
922
    ///
923
    /// Suitable for external callers who want to test behaviour
924
    /// of layers including the logic in the channel frontend
925
    /// (`Channel` object methods).
926
    //
927
    // This differs from test::fake_channel as follows:
928
    //  * It returns the mpsc Receiver
929
    //  * It does not require explicit specification of details
930
    #[cfg(feature = "testing")]
931
48
    pub fn new_fake(
932
48
        rt: impl SleepProvider + CoarseTimeProvider,
933
48
        channel_type: ChannelType,
934
48
    ) -> (Channel, mpsc::UnboundedReceiver<CtrlMsg>) {
935
48
        let (control, control_recv) = mpsc::unbounded();
936
48
        let details = fake_channel_details();
937

            
938
48
        let unique_id = UniqId::new();
939
48
        let peer_id = OwnedChanTarget::builder()
940
48
            .ed_identity([6_u8; 32].into())
941
48
            .rsa_identity([10_u8; 20].into())
942
48
            .build()
943
48
            .expect("Couldn't construct peer id");
944

            
945
        // This will make rx trigger immediately.
946
48
        let (_tx, rx) = oneshot_broadcast::channel();
947
48
        let (padding_ctrl, _) = client::circuit::padding::new_padding(DynTimeProvider::new(rt));
948

            
949
48
        let channel = Channel {
950
48
            channel_type,
951
48
            control,
952
48
            cell_tx: fake_mpsc().0,
953
48
            reactor_closed_rx: rx,
954
48
            padding_ctrl,
955
48
            unique_id,
956
48
            peer_id,
957
48
            clock_skew: ClockSkew::None,
958
48
            opened_at: coarsetime::Instant::now(),
959
48
            mutable: Default::default(),
960
48
            details,
961
48
        };
962
48
        (channel, control_recv)
963
48
    }
964
}
965

            
966
/// If there is any identity in `wanted_ident` that is not present in
967
/// `my_ident`, return a ChanMismatch error.
968
///
969
/// This is a helper for [`Channel::check_match`] and
970
/// [`UnverifiedChannel::check_internal`].
971
50
fn check_id_match_helper<T, U>(my_ident: &T, wanted_ident: &U) -> Result<()>
972
50
where
973
50
    T: HasRelayIds + ?Sized,
974
50
    U: HasRelayIds + ?Sized,
975
{
976
70
    for desired in wanted_ident.identities() {
977
70
        let id_type = desired.id_type();
978
70
        match my_ident.identity(id_type) {
979
70
            Some(actual) if actual == desired => {}
980
8
            Some(actual) => {
981
8
                return Err(Error::ChanMismatch(format!(
982
8
                    "Identity {} does not match target {}",
983
8
                    sv(actual),
984
8
                    sv(desired)
985
8
                )));
986
            }
987
            None => {
988
                return Err(Error::ChanMismatch(format!(
989
                    "Peer does not have {} identity",
990
                    id_type
991
                )));
992
            }
993
        }
994
    }
995
42
    Ok(())
996
50
}
997

            
998
impl HasRelayIds for Channel {
999
4559
    fn identity(
4559
        &self,
4559
        key_type: tor_linkspec::RelayIdType,
4559
    ) -> Option<tor_linkspec::RelayIdRef<'_>> {
4559
        self.peer_id.identity(key_type)
4559
    }
}
/// The status of a channel which was closed successfully.
///
/// **Note:** This doesn't have any associated data,
/// but may be expanded in the future.
// I can't think of any info we'd want to return to waiters,
// but this type leaves the possibility open without requiring any backwards-incompatible changes.
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct CloseInfo;
/// The status of a channel which closed unexpectedly.
#[derive(Clone, Debug, thiserror::Error)]
#[non_exhaustive]
pub enum ClosedUnexpectedly {
    /// The channel reactor was dropped or panicked before completing.
    #[error("channel reactor was dropped or panicked before completing")]
    ReactorDropped,
    /// The channel reactor had an internal error.
    #[error("channel reactor had an internal error")]
    ReactorError(Error),
}
/// Make some fake channel details (for testing only!)
#[cfg(any(test, feature = "testing"))]
1150
fn fake_channel_details() -> Arc<ChannelDetails> {
1150
    let unused_since = AtomicOptTimestamp::new();
1150
    Arc::new(ChannelDetails {
1150
        unused_since,
1150
        memquota: crate::util::fake_mq(),
1150
    })
1150
}
/// Make an MPSC queue, of the type we use in Channels, but a fake one for testing
#[cfg(any(test, feature = "testing"))] // Used by Channel::new_fake which is also feature=testing
1148
pub(crate) fn fake_mpsc() -> (CellTx, CellRx) {
1148
    let (tx, rx) = crate::fake_mpsc(CHANNEL_BUFFER_SIZE);
    #[cfg(feature = "circ-padding")]
1148
    let (tx, rx) = counting_streams::channel(tx, rx);
1148
    (tx, rx)
1148
}
#[cfg(test)]
pub(crate) mod test {
    // Most of this module is tested via tests that also check on the
    // reactor code; there are just a few more cases to examine here.
    #![allow(clippy::unwrap_used)]
    use super::*;
    use crate::channel::handler::test::MsgBuf;
    pub(crate) use crate::channel::reactor::test::{CodecResult, new_reactor};
    use crate::util::fake_mq;
    use tor_cell::chancell::msg::HandshakeType;
    use tor_cell::chancell::{AnyChanCell, msg};
    use tor_rtcompat::{PreferredRuntime, test_with_one_runtime};
    /// Make a new fake reactor-less channel.  For testing only, obviously.
    pub(crate) fn fake_channel(
        rt: impl SleepProvider + CoarseTimeProvider,
        channel_type: ChannelType,
    ) -> Channel {
        let unique_id = UniqId::new();
        let peer_id = OwnedChanTarget::builder()
            .ed_identity([6_u8; 32].into())
            .rsa_identity([10_u8; 20].into())
            .build()
            .expect("Couldn't construct peer id");
        // This will make rx trigger immediately.
        let (_tx, rx) = oneshot_broadcast::channel();
        let (padding_ctrl, _) = client::circuit::padding::new_padding(DynTimeProvider::new(rt));
        Channel {
            channel_type,
            control: mpsc::unbounded().0,
            cell_tx: fake_mpsc().0,
            reactor_closed_rx: rx,
            padding_ctrl,
            unique_id,
            peer_id,
            clock_skew: ClockSkew::None,
            opened_at: coarsetime::Instant::now(),
            mutable: Default::default(),
            details: fake_channel_details(),
        }
    }
    #[test]
    fn send_bad() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            use std::error::Error;
            let chan = fake_channel(rt, ChannelType::ClientInitiator);
            let cell = AnyChanCell::new(CircId::new(7), msg::Created2::new(&b"hihi"[..]).into());
            let e = chan.sender().check_cell(&cell);
            assert!(e.is_err());
            assert!(
                format!("{}", e.unwrap_err().source().unwrap())
                    .contains("Can't send CREATED2 cell on client channel")
            );
            let cell = AnyChanCell::new(None, msg::Certs::new_empty().into());
            let e = chan.sender().check_cell(&cell);
            assert!(e.is_err());
            assert!(
                format!("{}", e.unwrap_err().source().unwrap())
                    .contains("Can't send CERTS cell after handshake is done")
            );
            let cell = AnyChanCell::new(
                CircId::new(5),
                msg::Create2::new(HandshakeType::NTOR, &b"abc"[..]).into(),
            );
            let e = chan.sender().check_cell(&cell);
            assert!(e.is_ok());
            // FIXME(eta): more difficult to test that sending works now that it has to go via reactor
            // let got = output.next().await.unwrap();
            // assert!(matches!(got.msg(), ChanMsg::Create2(_)));
        });
    }
    #[test]
    fn chanbuilder() {
        let rt = PreferredRuntime::create().unwrap();
        let mut builder = ChannelBuilder::default();
        builder.set_declared_method(tor_linkspec::ChannelMethod::Direct(vec![
            "127.0.0.1:9001".parse().unwrap(),
        ]));
        let tls = MsgBuf::new(&b""[..]);
        let _outbound = builder.launch_client(tls, rt, fake_mq());
    }
    #[test]
    fn check_match() {
        test_with_one_runtime!(|rt| async move {
            let chan = fake_channel(rt, ChannelType::ClientInitiator);
            let t1 = OwnedChanTarget::builder()
                .ed_identity([6; 32].into())
                .rsa_identity([10; 20].into())
                .build()
                .unwrap();
            let t2 = OwnedChanTarget::builder()
                .ed_identity([1; 32].into())
                .rsa_identity([3; 20].into())
                .build()
                .unwrap();
            let t3 = OwnedChanTarget::builder()
                .ed_identity([3; 32].into())
                .rsa_identity([2; 20].into())
                .build()
                .unwrap();
            assert!(chan.check_match(&t1).is_ok());
            assert!(chan.check_match(&t2).is_err());
            assert!(chan.check_match(&t3).is_err());
        });
    }
    #[test]
    fn unique_id() {
        test_with_one_runtime!(|rt| async move {
            let ch1 = fake_channel(rt.clone(), ChannelType::ClientInitiator);
            let ch2 = fake_channel(rt, ChannelType::ClientInitiator);
            assert_ne!(ch1.unique_id(), ch2.unique_id());
        });
    }
    #[test]
    fn duration_unused_at() {
        test_with_one_runtime!(|rt| async move {
            let details = fake_channel_details();
            let mut ch = fake_channel(rt, ChannelType::ClientInitiator);
            ch.details = details.clone();
            details.unused_since.update();
            assert!(ch.duration_unused().is_some());
        });
    }
}