1
//! Code for talking directly (over a TLS connection) to a Tor client or relay.
2
//!
3
//! Channels form the basis of the rest of the Tor protocol: they are
4
//! the only way for two Tor instances to talk.
5
//!
6
//! Channels are not useful directly for application requests: after
7
//! making a channel, it needs to get used to build circuits, and the
8
//! circuits are used to anonymize streams.  The streams are the
9
//! objects corresponding to directory requests.
10
//!
11
//! In general, you shouldn't try to manage channels on your own;
12
//! use the `tor-chanmgr` crate instead.
13
//!
14
//! To launch a channel:
15
//!
16
//!  * Create a TLS connection as an object that implements AsyncRead +
17
//!    AsyncWrite + StreamOps, and pass it to a [ChannelBuilder].  This will
18
//!    yield an [handshake::OutboundClientHandshake] that represents
19
//!    the state of the handshake.
20
//!  * Call [handshake::OutboundClientHandshake::connect] on the result
21
//!    to negotiate the rest of the handshake.  This will verify
22
//!    syntactic correctness of the handshake, but not its cryptographic
23
//!    integrity.
24
//!  * Call [handshake::UnverifiedChannel::check] on the result.  This
25
//!    finishes the cryptographic checks.
26
//!  * Call [handshake::VerifiedChannel::finish] on the result. This
27
//!    completes the handshake and produces an open channel and Reactor.
28
//!  * Launch an asynchronous task to call the reactor's run() method.
29
//!
30
//! One you have a running channel, you can create circuits on it with
31
//! its [Channel::new_circ] method.  See
32
//! [crate::tunnel::circuit::PendingClientCirc] for information on how to
33
//! proceed from there.
34
//!
35
//! # Design
36
//!
37
//! For now, this code splits the channel into two pieces: a "Channel"
38
//! object that can be used by circuits to write cells onto the
39
//! channel, and a "Reactor" object that runs as a task in the
40
//! background, to read channel cells and pass them to circuits as
41
//! appropriate.
42
//!
43
//! I'm not at all sure that's the best way to do that, but it's what
44
//! I could think of.
45
//!
46
//! # Limitations
47
//!
48
//! This is client-only, and only supports link protocol version 4.
49
//!
50
//! TODO: There is no channel padding.
51
//!
52
//! TODO: There is no flow control, rate limiting, queueing, or
53
//! fairness.
54

            
55
/// The size of the channel buffer for communication between `Channel` and its reactor.
56
pub const CHANNEL_BUFFER_SIZE: usize = 128;
57

            
58
mod circmap;
59
mod codec;
60
mod handshake;
61
pub mod kist;
62
pub mod padding;
63
pub mod params;
64
mod reactor;
65
mod unique_id;
66

            
67
pub use crate::channel::params::*;
68
use crate::channel::reactor::{BoxedChannelSink, BoxedChannelStream, Reactor};
69
pub use crate::channel::unique_id::UniqId;
70
use crate::memquota::{ChannelAccount, CircuitAccount, SpecificAccount as _};
71
use crate::util::err::ChannelClosed;
72
use crate::util::oneshot_broadcast;
73
use crate::util::ts::AtomicOptTimestamp;
74
use crate::{tunnel, tunnel::circuit, ClockSkew};
75
use crate::{Error, Result};
76
use reactor::BoxedChannelStreamOps;
77
use safelog::sensitive as sv;
78
use std::future::{Future, IntoFuture};
79
use std::pin::Pin;
80
use std::sync::{Mutex, MutexGuard};
81
use std::time::Duration;
82
use tor_cell::chancell::msg::AnyChanMsg;
83
use tor_cell::chancell::{msg, msg::PaddingNegotiate, AnyChanCell, CircId};
84
use tor_cell::chancell::{ChanCell, ChanMsg};
85
use tor_cell::restricted_msg;
86
use tor_error::internal;
87
use tor_linkspec::{HasRelayIds, OwnedChanTarget};
88
use tor_memquota::mq_queue::{self, ChannelSpec as _, MpscSpec};
89
use tor_rtcompat::{CoarseTimeProvider, DynTimeProvider, SleepProvider, StreamOps};
90

            
91
/// Imports that are re-exported pub if feature `testing` is enabled
92
///
93
/// Putting them together in a little module like this allows us to select the
94
/// visibility for all of these things together.
95
mod testing_exports {
96
    #![allow(unreachable_pub)]
97
    pub use super::reactor::CtrlMsg;
98
    pub use crate::tunnel::circuit::celltypes::CreateResponse;
99
}
100
#[cfg(feature = "testing")]
101
pub use testing_exports::*;
102
#[cfg(not(feature = "testing"))]
103
use testing_exports::*;
104

            
105
use asynchronous_codec;
106
use futures::channel::mpsc;
107
use futures::io::{AsyncRead, AsyncWrite};
108
use oneshot_fused_workaround as oneshot;
109

            
110
use educe::Educe;
111
use futures::{FutureExt as _, Sink};
112
use std::result::Result as StdResult;
113
use std::sync::Arc;
114
use std::task::{Context, Poll};
115

            
116
use tracing::trace;
117

            
118
// reexport
119
use crate::channel::unique_id::CircUniqIdContext;
120
#[cfg(test)]
121
pub(crate) use codec::CodecError;
122
pub use handshake::{OutboundClientHandshake, UnverifiedChannel, VerifiedChannel};
123

            
124
use kist::KistParams;
125

            
126
restricted_msg! {
127
    /// A channel message that we allow to be sent from a server to a client on
128
    /// an open channel.
129
    ///
130
    /// (An Open channel here is one on which we have received a NETINFO cell.)
131
    ///
132
    /// Note that an unexpected message type will _not_ be ignored: instead, it
133
    /// will cause the channel to shut down.
134
    #[derive(Clone, Debug)]
135
    pub(crate) enum OpenChanMsgS2C : ChanMsg {
136
        Padding,
137
        Vpadding,
138
        // Not Create*, since we are not a relay.
139
        // Not Created, since we never send CREATE.
140
        CreatedFast,
141
        Created2,
142
        Relay,
143
        // Not RelayEarly, since we are a client.
144
        Destroy,
145
        // Not PaddingNegotiate, since we are not a relay.
146
        // Not Versions, Certs, AuthChallenge, Authenticate: they are for handshakes.
147
        // Not Authorize: it is reserved, but unused.
148
    }
149
}
150

            
151
/// A channel cell that we allot to be sent on an open channel from
152
/// a server to a client.
153
pub(crate) type OpenChanCellS2C = ChanCell<OpenChanMsgS2C>;
154

            
155
/// Type alias: A Sink and Stream that transforms a TLS connection into
156
/// a cell-based communication mechanism.
157
type CellFrame<T> =
158
    asynchronous_codec::Framed<T, crate::channel::codec::ChannelCodec<OpenChanMsgS2C, AnyChanMsg>>;
159

            
160
/// An open client channel, ready to send and receive Tor cells.
161
///
162
/// A channel is a direct connection to a Tor relay, implemented using TLS.
163
///
164
/// This struct is a frontend that can be used to send cells
165
/// and otherwise control the channel.  The main state is
166
/// in the Reactor object.
167
///
168
/// (Users need a mutable reference because of the types in `Sink`, and
169
/// ultimately because `cell_tx: mpsc::Sender` doesn't work without mut.
170
///
171
/// # Channel life cycle
172
///
173
/// Channels can be created directly here through the [`ChannelBuilder`] API.
174
/// For a higher-level API (with better support for TLS, pluggable transports,
175
/// and channel reuse) see the `tor-chanmgr` crate.
176
///
177
/// After a channel is created, it will persist until it is closed in one of
178
/// four ways:
179
///    1. A remote error occurs.
180
///    2. The other side of the channel closes the channel.
181
///    3. Someone calls [`Channel::terminate`] on the channel.
182
///    4. The last reference to the `Channel` is dropped. (Note that every circuit
183
///       on a `Channel` keeps a reference to it, which will in turn keep the
184
///       channel from closing until all those circuits have gone away.)
185
///
186
/// Note that in cases 1-3, the [`Channel`] object itself will still exist: it
187
/// will just be unusable for most purposes.  Most operations on it will fail
188
/// with an error.
189
#[derive(Debug)]
190
pub struct Channel {
191
    /// A channel used to send control messages to the Reactor.
192
    control: mpsc::UnboundedSender<CtrlMsg>,
193
    /// A channel used to send cells to the Reactor.
194
    cell_tx: mq_queue::Sender<AnyChanCell, mq_queue::MpscSpec>,
195

            
196
    /// A receiver that indicates whether the channel is closed.
197
    ///
198
    /// Awaiting will return a `CancelledError` event when the reactor is dropped.
199
    /// Read to decide if operations may succeed, and is returned by `wait_for_close`.
200
    reactor_closed_rx: oneshot_broadcast::Receiver<Result<CloseInfo>>,
201

            
202
    /// A unique identifier for this channel.
203
    unique_id: UniqId,
204
    /// Validated identity and address information for this peer.
205
    peer_id: OwnedChanTarget,
206
    /// The declared clock skew on this channel, at the time when this channel was
207
    /// created.
208
    clock_skew: ClockSkew,
209
    /// The time when this channel was successfully completed
210
    opened_at: coarsetime::Instant,
211
    /// Mutable state used by the `Channel.
212
    mutable: Mutex<MutableDetails>,
213

            
214
    /// Information shared with the reactor
215
    details: Arc<ChannelDetails>,
216
}
217

            
218
/// This is information shared between the reactor and the frontend (`Channel` object).
219
///
220
/// `control` can't be here because we rely on it getting dropped when the last user goes away.
221
#[derive(Debug)]
222
pub(crate) struct ChannelDetails {
223
    /// Since when the channel became unused.
224
    ///
225
    /// If calling `time_since_update` returns None,
226
    /// this channel is still in use by at least one circuit.
227
    ///
228
    /// Set by reactor when a circuit is added or removed.
229
    /// Read from `Channel::duration_unused`.
230
    unused_since: AtomicOptTimestamp,
231
    /// Memory quota account
232
    ///
233
    /// This is here partly because we need to ensure it lives as long as the channel,
234
    /// as otherwise the memquota system will tear the account down.
235
    #[allow(dead_code)]
236
    memquota: ChannelAccount,
237
}
238

            
239
/// Mutable details (state) used by the `Channel` (frontend)
240
#[derive(Debug, Default)]
241
struct MutableDetails {
242
    /// State used to control padding
243
    padding: PaddingControlState,
244
}
245

            
246
/// State used to control padding
247
///
248
/// We store this here because:
249
///
250
///  1. It must be per-channel, because it depends on channel usage.  So it can't be in
251
///     (for example) `ChannelPaddingInstructionsUpdate`.
252
///
253
///  2. It could be in the channel manager's per-channel state but (for code flow reasons
254
///     there, really) at the point at which the channel manager concludes for a pending
255
///     channel that it ought to update the usage, it has relinquished the lock on its own data
256
///     structure.
257
///     And there is actually no need for this to be global: a per-channel lock is better than
258
///     reacquiring the global one.
259
///
260
///  3. It doesn't want to be in the channel reactor since that's super hot.
261
///
262
/// See also the overview at [`tor_proto::channel::padding`](padding)
263
432
#[derive(Debug, Educe)]
264
#[educe(Default)]
265
enum PaddingControlState {
266
    /// No usage of this channel, so far, implies sending or negotiating channel padding.
267
    ///
268
    /// This means we do not send (have not sent) any `ChannelPaddingInstructionsUpdates` to the reactor,
269
    /// with the following consequences:
270
    ///
271
    ///  * We don't enable our own padding.
272
    ///  * We don't do any work to change the timeout distribution in the padding timer,
273
    ///    (which is fine since this timer is not enabled).
274
    ///  * We don't send any PADDING_NEGOTIATE cells.  The peer is supposed to come to the
275
    ///    same conclusions as us, based on channel usage: it should also not send padding.
276
    #[educe(Default)]
277
    UsageDoesNotImplyPadding {
278
        /// The last padding parameters (from reparameterize)
279
        ///
280
        /// We keep this so that we can send it if and when
281
        /// this channel starts to be used in a way that implies (possibly) sending padding.
282
        padding_params: ChannelPaddingInstructionsUpdates,
283
    },
284

            
285
    /// Some usage of this channel implies possibly sending channel padding
286
    ///
287
    /// The required padding timer, negotiation cell, etc.,
288
    /// have been communicated to the reactor via a `CtrlMsg::ConfigUpdate`.
289
    ///
290
    /// Once we have set this variant, it remains this way forever for this channel,
291
    /// (the spec speaks of channels "only used for" certain purposes not getting padding).
292
    PaddingConfigured,
293
}
294

            
295
use PaddingControlState as PCS;
296

            
297
/// A handle to a [`Channel`]` that can be used, by circuits, to send channel cells.
298
#[derive(Debug)]
299
pub(crate) struct ChannelSender {
300
    /// MPSC sender to send cells.
301
    cell_tx: mq_queue::Sender<AnyChanCell, mq_queue::MpscSpec>,
302
    /// A receiver used to check if the channel is closed.
303
    reactor_closed_rx: oneshot_broadcast::Receiver<Result<CloseInfo>>,
304
    /// Unique ID for this channel. For logging.
305
    unique_id: UniqId,
306
}
307

            
308
impl ChannelSender {
309
    /// Check whether a cell type is permissible to be _sent_ on an
310
    /// open client channel.
311
2942
    fn check_cell(&self, cell: &AnyChanCell) -> Result<()> {
312
        use msg::AnyChanMsg::*;
313
2942
        let msg = cell.msg();
314
2942
        match msg {
315
8
            Created(_) | Created2(_) | CreatedFast(_) => Err(Error::from(internal!(
316
8
                "Can't send {} cell on client channel",
317
8
                msg.cmd()
318
8
            ))),
319
            Certs(_) | Versions(_) | Authenticate(_) | Authorize(_) | AuthChallenge(_)
320
8
            | Netinfo(_) => Err(Error::from(internal!(
321
8
                "Can't send {} cell after handshake is done",
322
8
                msg.cmd()
323
8
            ))),
324
2926
            _ => Ok(()),
325
        }
326
2942
    }
327

            
328
    /// Obtain a reference to the `ChannelSender`'s [`DynTimeProvider`]
329
    ///
330
    /// (This can sometimes be used to avoid having to keep
331
    /// a separate clone of the time provider.)
332
48
    pub(crate) fn time_provider(&self) -> &DynTimeProvider {
333
48
        self.cell_tx.time_provider()
334
48
    }
335
}
336

            
337
impl Sink<AnyChanCell> for ChannelSender {
338
    type Error = Error;
339

            
340
6820
    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
341
6820
        let this = self.get_mut();
342
6820
        Pin::new(&mut this.cell_tx)
343
6820
            .poll_ready(cx)
344
6821
            .map_err(|_| ChannelClosed.into())
345
6820
    }
346

            
347
2918
    fn start_send(self: Pin<&mut Self>, cell: AnyChanCell) -> Result<()> {
348
2918
        let this = self.get_mut();
349
2918
        if this.reactor_closed_rx.is_ready() {
350
            return Err(ChannelClosed.into());
351
2918
        }
352
2918
        this.check_cell(&cell)?;
353
        {
354
            use msg::AnyChanMsg::*;
355
2918
            match cell.msg() {
356
2838
                Relay(_) | Padding(_) | Vpadding(_) => {} // too frequent to log.
357
80
                _ => trace!(
358
                    "{}: Sending {} for {}",
359
                    this.unique_id,
360
                    cell.msg().cmd(),
361
                    CircId::get_or_zero(cell.circid())
362
                ),
363
            }
364
        }
365

            
366
2918
        Pin::new(&mut this.cell_tx)
367
2918
            .start_send(cell)
368
2920
            .map_err(|_| ChannelClosed.into())
369
2918
    }
370

            
371
920
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
372
920
        let this = self.get_mut();
373
920
        Pin::new(&mut this.cell_tx)
374
920
            .poll_flush(cx)
375
920
            .map_err(|_| ChannelClosed.into())
376
920
    }
377

            
378
    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
379
        let this = self.get_mut();
380
        Pin::new(&mut this.cell_tx)
381
            .poll_close(cx)
382
            .map_err(|_| ChannelClosed.into())
383
    }
384
}
385

            
386
/// Structure for building and launching a Tor channel.
387
#[derive(Default)]
388
pub struct ChannelBuilder {
389
    /// If present, a description of the address we're trying to connect to,
390
    /// and the way in which we are trying to connect to it.
391
    ///
392
    /// TODO: at some point, check this against the addresses in the netinfo
393
    /// cell too.
394
    target: Option<tor_linkspec::ChannelMethod>,
395
}
396

            
397
impl ChannelBuilder {
398
    /// Construct a new ChannelBuilder.
399
47
    pub fn new() -> Self {
400
47
        ChannelBuilder::default()
401
47
    }
402

            
403
    /// Set the declared target method of this channel to correspond to a direct
404
    /// connection to a given socket address.
405
    #[deprecated(note = "use set_declared_method instead", since = "0.7.1")]
406
    pub fn set_declared_addr(&mut self, target: std::net::SocketAddr) {
407
        self.set_declared_method(tor_linkspec::ChannelMethod::Direct(vec![target]));
408
    }
409

            
410
    /// Set the declared target method of this channel.
411
    ///
412
    /// Note that nothing enforces the correctness of this method: it
413
    /// doesn't have to match the real method used to create the TLS
414
    /// stream.
415
49
    pub fn set_declared_method(&mut self, target: tor_linkspec::ChannelMethod) {
416
49
        self.target = Some(target);
417
49
    }
418

            
419
    /// Launch a new client handshake over a TLS stream.
420
    ///
421
    /// After calling this function, you'll need to call `connect()` on
422
    /// the result to start the handshake.  If that succeeds, you'll have
423
    /// authentication info from the relay: call `check()` on the result
424
    /// to check that.  Finally, to finish the handshake, call `finish()`
425
    /// on the result of _that_.
426
4
    pub fn launch<T, S>(
427
4
        self,
428
4
        tls: T,
429
4
        sleep_prov: S,
430
4
        memquota: ChannelAccount,
431
4
    ) -> OutboundClientHandshake<T, S>
432
4
    where
433
4
        T: AsyncRead + AsyncWrite + StreamOps + Send + Unpin + 'static,
434
4
        S: CoarseTimeProvider + SleepProvider,
435
4
    {
436
4
        handshake::OutboundClientHandshake::new(tls, self.target, sleep_prov, memquota)
437
4
    }
438
}
439

            
440
impl Channel {
441
    /// Construct a channel and reactor.
442
    ///
443
    /// Internal method, called to finalize the channel when we've
444
    /// sent our netinfo cell, received the peer's netinfo cell, and
445
    /// we're finally ready to create circuits.
446
    #[allow(clippy::too_many_arguments)] // TODO consider if we want a builder
447
228
    fn new<S>(
448
228
        link_protocol: u16,
449
228
        sink: BoxedChannelSink,
450
228
        stream: BoxedChannelStream,
451
228
        streamops: BoxedChannelStreamOps,
452
228
        unique_id: UniqId,
453
228
        peer_id: OwnedChanTarget,
454
228
        clock_skew: ClockSkew,
455
228
        sleep_prov: S,
456
228
        memquota: ChannelAccount,
457
228
    ) -> Result<(Arc<Self>, reactor::Reactor<S>)>
458
228
    where
459
228
        S: CoarseTimeProvider + SleepProvider,
460
228
    {
461
        use circmap::{CircIdRange, CircMap};
462
228
        let circmap = CircMap::new(CircIdRange::High);
463
228
        let dyn_time = DynTimeProvider::new(sleep_prov.clone());
464
228

            
465
228
        let (control_tx, control_rx) = mpsc::unbounded();
466
228
        let (cell_tx, cell_rx) = mq_queue::MpscSpec::new(CHANNEL_BUFFER_SIZE)
467
228
            .new_mq(dyn_time.clone(), memquota.as_raw_account())?;
468
228
        let unused_since = AtomicOptTimestamp::new();
469
228
        unused_since.update();
470
228

            
471
228
        let mutable = MutableDetails::default();
472
228
        let (reactor_closed_tx, reactor_closed_rx) = oneshot_broadcast::channel();
473
228

            
474
228
        let details = ChannelDetails {
475
228
            unused_since,
476
228
            memquota,
477
228
        };
478
228
        let details = Arc::new(details);
479
228

            
480
228
        let channel = Arc::new(Channel {
481
228
            control: control_tx,
482
228
            cell_tx,
483
228
            reactor_closed_rx,
484
228
            unique_id,
485
228
            peer_id,
486
228
            clock_skew,
487
228
            opened_at: coarsetime::Instant::now(),
488
228
            mutable: Mutex::new(mutable),
489
228
            details: Arc::clone(&details),
490
228
        });
491

            
492
        // We start disabled; the channel manager will `reconfigure` us soon after creation.
493
228
        let padding_timer = Box::pin(padding::Timer::new_disabled(sleep_prov, None)?);
494

            
495
228
        let reactor = Reactor {
496
228
            control: control_rx,
497
228
            cells: cell_rx,
498
228
            reactor_closed_tx,
499
228
            input: futures::StreamExt::fuse(stream),
500
228
            output: sink,
501
228
            streamops,
502
228
            circs: circmap,
503
228
            circ_unique_id_ctx: CircUniqIdContext::new(),
504
228
            link_protocol,
505
228
            unique_id,
506
228
            details,
507
228
            padding_timer,
508
228
            special_outgoing: Default::default(),
509
228
        };
510
228

            
511
228
        Ok((channel, reactor))
512
228
    }
513

            
514
    /// Return a process-unique identifier for this channel.
515
4
    pub fn unique_id(&self) -> UniqId {
516
4
        self.unique_id
517
4
    }
518

            
519
    /// Return a reference to the memory tracking account for this Channel
520
    pub fn mq_account(&self) -> &ChannelAccount {
521
        &self.details.memquota
522
    }
523

            
524
    /// Obtain a reference to the `Channel`'s [`DynTimeProvider`]
525
    ///
526
    /// (This can sometimes be used to avoid having to keep
527
    /// a separate clone of the time provider.)
528
160
    pub fn time_provider(&self) -> &DynTimeProvider {
529
160
        self.cell_tx.time_provider()
530
160
    }
531

            
532
    /// Return an OwnedChanTarget representing the actual handshake used to
533
    /// create this channel.
534
220
    pub fn target(&self) -> &OwnedChanTarget {
535
220
        &self.peer_id
536
220
    }
537

            
538
    /// Return the amount of time that has passed since this channel became open.
539
    pub fn age(&self) -> Duration {
540
        self.opened_at.elapsed().into()
541
    }
542

            
543
    /// Return a ClockSkew declaring how much clock skew the other side of this channel
544
    /// claimed that we had when we negotiated the connection.
545
    pub fn clock_skew(&self) -> ClockSkew {
546
        self.clock_skew
547
    }
548

            
549
    /// Send a control message
550
615
    fn send_control(&self, msg: CtrlMsg) -> StdResult<(), ChannelClosed> {
551
615
        self.control
552
615
            .unbounded_send(msg)
553
639
            .map_err(|_| ChannelClosed)?;
554
567
        Ok(())
555
615
    }
556

            
557
    /// Acquire the lock on `mutable` (and handle any poison error)
558
188
    fn mutable(&self) -> MutexGuard<MutableDetails> {
559
188
        self.mutable.lock().expect("channel details poisoned")
560
188
    }
561

            
562
    /// Specify that this channel should do activities related to channel padding
563
    ///
564
    /// Initially, the channel does nothing related to channel padding:
565
    /// it neither sends any padding, nor sends any PADDING_NEGOTIATE cells.
566
    ///
567
    /// After this function has been called, it will do both,
568
    /// according to the parameters specified through `reparameterize`.
569
    /// Note that this might include *disabling* padding
570
    /// (for example, by sending a `PADDING_NEGOTIATE`).
571
    ///
572
    /// Idempotent.
573
    ///
574
    /// There is no way to undo the effect of this call.
575
188
    pub fn engage_padding_activities(&self) {
576
188
        let mut mutable = self.mutable();
577
188

            
578
188
        match &mutable.padding {
579
            PCS::UsageDoesNotImplyPadding {
580
188
                padding_params: params,
581
188
            } => {
582
188
                // Well, apparently the channel usage *does* imply padding now,
583
188
                // so we need to (belatedly) enable the timer,
584
188
                // send the padding negotiation cell, etc.
585
188
                let mut params = params.clone();
586
188

            
587
188
                // Except, maybe the padding we would be requesting is precisely default,
588
188
                // so we wouldn't actually want to send that cell.
589
188
                if params.padding_negotiate == Some(PaddingNegotiate::start_default()) {
590
                    params.padding_negotiate = None;
591
188
                }
592

            
593
188
                match self.send_control(CtrlMsg::ConfigUpdate(Arc::new(params))) {
594
188
                    Ok(()) => {}
595
                    Err(ChannelClosed) => return,
596
                }
597

            
598
188
                mutable.padding = PCS::PaddingConfigured;
599
            }
600

            
601
            PCS::PaddingConfigured => {
602
                // OK, nothing to do
603
            }
604
        }
605

            
606
188
        drop(mutable); // release the lock now: lock span covers the send, ensuring ordering
607
188
    }
608

            
609
    /// Reparameterise (update parameters; reconfigure)
610
    ///
611
    /// Returns `Err` if the channel was closed earlier
612
423
    pub fn reparameterize(&self, params: Arc<ChannelPaddingInstructionsUpdates>) -> Result<()> {
613
423
        let mut mutable = self
614
423
            .mutable
615
423
            .lock()
616
423
            .map_err(|_| internal!("channel details poisoned"))?;
617

            
618
423
        match &mut mutable.padding {
619
            PCS::PaddingConfigured => {
620
235
                self.send_control(CtrlMsg::ConfigUpdate(params))?;
621
            }
622
188
            PCS::UsageDoesNotImplyPadding { padding_params } => {
623
188
                padding_params.combine(&params);
624
188
            }
625
        }
626

            
627
423
        drop(mutable); // release the lock now: lock span covers the send, ensuring ordering
628
423
        Ok(())
629
423
    }
630

            
631
    /// Update the KIST parameters.
632
    ///
633
    /// Returns `Err` if the channel is closed.
634
    pub fn reparameterize_kist(&self, kist_params: KistParams) -> Result<()> {
635
        Ok(self.send_control(CtrlMsg::KistConfigUpdate(kist_params))?)
636
    }
637

            
638
    /// Return an error if this channel is somehow mismatched with the
639
    /// given target.
640
30
    pub fn check_match<T: HasRelayIds + ?Sized>(&self, target: &T) -> Result<()> {
641
30
        check_id_match_helper(&self.peer_id, target)
642
30
    }
643

            
644
    /// Return true if this channel is closed and therefore unusable.
645
95
    pub fn is_closing(&self) -> bool {
646
95
        self.reactor_closed_rx.is_ready()
647
95
    }
648

            
649
    /// If the channel is not in use, return the amount of time
650
    /// it has had with no circuits.
651
    ///
652
    /// Return `None` if the channel is currently in use.
653
167
    pub fn duration_unused(&self) -> Option<std::time::Duration> {
654
167
        self.details
655
167
            .unused_since
656
167
            .time_since_update()
657
167
            .map(Into::into)
658
167
    }
659

            
660
    /// Return a new [`ChannelSender`] to transmit cells on this channel.
661
184
    pub(crate) fn sender(&self) -> ChannelSender {
662
184
        ChannelSender {
663
184
            cell_tx: self.cell_tx.clone(),
664
184
            reactor_closed_rx: self.reactor_closed_rx.clone(),
665
184
            unique_id: self.unique_id,
666
184
        }
667
184
    }
668

            
669
    /// Return a newly allocated PendingClientCirc object with
670
    /// a corresponding circuit reactor. A circuit ID is allocated, but no
671
    /// messages are sent, and no cryptography is done.
672
    ///
673
    /// To use the results of this method, call Reactor::run() in a
674
    /// new task, then use the methods of
675
    /// [crate::tunnel::circuit::PendingClientCirc] to build the circuit.
676
8
    pub async fn new_circ(
677
8
        self: &Arc<Self>,
678
12
    ) -> Result<(circuit::PendingClientCirc, tunnel::reactor::Reactor)> {
679
8
        if self.is_closing() {
680
            return Err(ChannelClosed.into());
681
8
        }
682
8

            
683
8
        let time_prov = self.cell_tx.time_provider().clone();
684
8
        let memquota = CircuitAccount::new(&self.details.memquota)?;
685

            
686
        // TODO: blocking is risky, but so is unbounded.
687
8
        let (sender, receiver) =
688
8
            MpscSpec::new(128).new_mq(time_prov.clone(), memquota.as_raw_account())?;
689
8
        let (createdsender, createdreceiver) = oneshot::channel::<CreateResponse>();
690
8

            
691
8
        let (tx, rx) = oneshot::channel();
692
8
        self.send_control(CtrlMsg::AllocateCircuit {
693
8
            created_sender: createdsender,
694
8
            sender,
695
8
            tx,
696
8
        })?;
697
8
        let (id, circ_unique_id) = rx.await.map_err(|_| ChannelClosed)??;
698

            
699
8
        trace!("{}: Allocated CircId {}", circ_unique_id, id);
700

            
701
8
        Ok(circuit::PendingClientCirc::new(
702
8
            id,
703
8
            self.clone(),
704
8
            createdreceiver,
705
8
            receiver,
706
8
            circ_unique_id,
707
8
            time_prov,
708
8
            memquota,
709
8
        ))
710
8
    }
711

            
712
    /// Shut down this channel immediately, along with all circuits that
713
    /// are using it.
714
    ///
715
    /// Note that other references to this channel may exist.  If they
716
    /// do, they will stop working after you call this function.
717
    ///
718
    /// It's not necessary to call this method if you're just done
719
    /// with a channel: the channel should close on its own once nothing
720
    /// is using it any more.
721
24
    pub fn terminate(&self) {
722
24
        let _ = self.send_control(CtrlMsg::Shutdown);
723
24
    }
724

            
725
    /// Tell the reactor that the circuit with the given ID has gone away.
726
160
    pub fn close_circuit(&self, circid: CircId) -> Result<()> {
727
160
        self.send_control(CtrlMsg::CloseCircuit(circid))?;
728
112
        Ok(())
729
160
    }
730

            
731
    /// Return a future that will resolve once this channel has closed.
732
    ///
733
    /// Note that this method does not _cause_ the channel to shut down on its own.
734
24
    pub fn wait_for_close(
735
24
        &self,
736
24
    ) -> impl Future<Output = StdResult<CloseInfo, ClosedUnexpectedly>> + Send + Sync + 'static
737
24
    {
738
24
        self.reactor_closed_rx
739
24
            .clone()
740
24
            .into_future()
741
32
            .map(|recv| match recv {
742
8
                Ok(Ok(info)) => Ok(info),
743
8
                Ok(Err(e)) => Err(ClosedUnexpectedly::ReactorError(e)),
744
8
                Err(oneshot_broadcast::SenderDropped) => Err(ClosedUnexpectedly::ReactorDropped),
745
36
            })
746
24
    }
747

            
748
    /// Make a new fake reactor-less channel.  For testing only, obviously.
749
    ///
750
    /// Returns the receiver end of the control message mpsc.
751
    ///
752
    /// Suitable for external callers who want to test behaviour
753
    /// of layers including the logic in the channel frontend
754
    /// (`Channel` object methods).
755
    //
756
    // This differs from test::fake_channel as follows:
757
    //  * It returns the mpsc Receiver
758
    //  * It does not require explicit specification of details
759
    #[cfg(feature = "testing")]
760
188
    pub fn new_fake() -> (Channel, mpsc::UnboundedReceiver<CtrlMsg>) {
761
188
        let (control, control_recv) = mpsc::unbounded();
762
188
        let details = fake_channel_details();
763
188

            
764
188
        let unique_id = UniqId::new();
765
188
        let peer_id = OwnedChanTarget::builder()
766
188
            .ed_identity([6_u8; 32].into())
767
188
            .rsa_identity([10_u8; 20].into())
768
188
            .build()
769
188
            .expect("Couldn't construct peer id");
770
188

            
771
188
        // This will make rx trigger immediately.
772
188
        let (_tx, rx) = oneshot_broadcast::channel();
773
188

            
774
188
        let channel = Channel {
775
188
            control,
776
188
            cell_tx: fake_mpsc().0,
777
188
            reactor_closed_rx: rx,
778
188
            unique_id,
779
188
            peer_id,
780
188
            clock_skew: ClockSkew::None,
781
188
            opened_at: coarsetime::Instant::now(),
782
188
            mutable: Default::default(),
783
188
            details,
784
188
        };
785
188
        (channel, control_recv)
786
188
    }
787
}
788

            
789
/// If there is any identity in `wanted_ident` that is not present in
790
/// `my_ident`, return a ChanMismatch error.
791
///
792
/// This is a helper for [`Channel::check_match`] and
793
/// [`UnverifiedChannel::check_internal`].
794
38
fn check_id_match_helper<T, U>(my_ident: &T, wanted_ident: &U) -> Result<()>
795
38
where
796
38
    T: HasRelayIds + ?Sized,
797
38
    U: HasRelayIds + ?Sized,
798
38
{
799
54
    for desired in wanted_ident.identities() {
800
54
        let id_type = desired.id_type();
801
54
        match my_ident.identity(id_type) {
802
54
            Some(actual) if actual == desired => {}
803
8
            Some(actual) => {
804
8
                return Err(Error::ChanMismatch(format!(
805
8
                    "Identity {} does not match target {}",
806
8
                    sv(actual),
807
8
                    sv(desired)
808
8
                )));
809
            }
810
            None => {
811
                return Err(Error::ChanMismatch(format!(
812
                    "Peer does not have {} identity",
813
                    id_type
814
                )))
815
            }
816
        }
817
    }
818
30
    Ok(())
819
38
}
820

            
821
impl HasRelayIds for Channel {
822
799
    fn identity(
823
799
        &self,
824
799
        key_type: tor_linkspec::RelayIdType,
825
799
    ) -> Option<tor_linkspec::RelayIdRef<'_>> {
826
799
        self.peer_id.identity(key_type)
827
799
    }
828
}
829

            
830
/// The status of a channel which was closed successfully.
831
///
832
/// **Note:** This doesn't have any associated data,
833
/// but may be expanded in the future.
834
// I can't think of any info we'd want to return to waiters,
835
// but this type leaves the possibility open without requiring any backwards-incompatible changes.
836
#[derive(Clone, Debug)]
837
#[non_exhaustive]
838
pub struct CloseInfo;
839

            
840
/// The status of a channel which closed unexpectedly.
841
#[derive(Clone, Debug, thiserror::Error)]
842
#[non_exhaustive]
843
pub enum ClosedUnexpectedly {
844
    /// The channel reactor was dropped or panicked before completing.
845
    #[error("channel reactor was dropped or panicked before completing")]
846
    ReactorDropped,
847
    /// The channel reactor had an internal error.
848
    #[error("channel reactor had an internal error")]
849
    ReactorError(Error),
850
}
851

            
852
/// Make some fake channel details (for testing only!)
853
#[cfg(any(test, feature = "testing"))]
854
204
fn fake_channel_details() -> Arc<ChannelDetails> {
855
204
    let unused_since = AtomicOptTimestamp::new();
856
204

            
857
204
    Arc::new(ChannelDetails {
858
204
        unused_since,
859
204
        memquota: crate::util::fake_mq(),
860
204
    })
861
204
}
862

            
863
/// Make an MPSC queue, of the type we use in Channels, but a fake one for testing
864
#[cfg(any(test, feature = "testing"))] // Used by Channel::new_fake which is also feature=testing
865
204
pub(crate) fn fake_mpsc() -> (
866
204
    mq_queue::Sender<AnyChanCell, mq_queue::MpscSpec>,
867
204
    mq_queue::Receiver<AnyChanCell, mq_queue::MpscSpec>,
868
204
) {
869
204
    crate::fake_mpsc(CHANNEL_BUFFER_SIZE)
870
204
}
871

            
872
#[cfg(test)]
873
pub(crate) mod test {
874
    // Most of this module is tested via tests that also check on the
875
    // reactor code; there are just a few more cases to examine here.
876
    #![allow(clippy::unwrap_used)]
877
    use super::*;
878
    use crate::channel::codec::test::MsgBuf;
879
    pub(crate) use crate::channel::reactor::test::new_reactor;
880
    use crate::util::fake_mq;
881
    use tor_cell::chancell::msg::HandshakeType;
882
    use tor_cell::chancell::{msg, AnyChanCell};
883
    use tor_rtcompat::PreferredRuntime;
884

            
885
    /// Make a new fake reactor-less channel.  For testing only, obviously.
886
    pub(crate) fn fake_channel(details: Arc<ChannelDetails>) -> Channel {
887
        let unique_id = UniqId::new();
888
        let peer_id = OwnedChanTarget::builder()
889
            .ed_identity([6_u8; 32].into())
890
            .rsa_identity([10_u8; 20].into())
891
            .build()
892
            .expect("Couldn't construct peer id");
893
        // This will make rx trigger immediately.
894
        let (_tx, rx) = oneshot_broadcast::channel();
895
        Channel {
896
            control: mpsc::unbounded().0,
897
            cell_tx: fake_mpsc().0,
898
            reactor_closed_rx: rx,
899
            unique_id,
900
            peer_id,
901
            clock_skew: ClockSkew::None,
902
            opened_at: coarsetime::Instant::now(),
903
            mutable: Default::default(),
904
            details,
905
        }
906
    }
907

            
908
    #[test]
909
    fn send_bad() {
910
        tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
911
            use std::error::Error;
912
            let chan = fake_channel(fake_channel_details());
913

            
914
            let cell = AnyChanCell::new(CircId::new(7), msg::Created2::new(&b"hihi"[..]).into());
915
            let e = chan.sender().check_cell(&cell);
916
            assert!(e.is_err());
917
            assert!(format!("{}", e.unwrap_err().source().unwrap())
918
                .contains("Can't send CREATED2 cell on client channel"));
919
            let cell = AnyChanCell::new(None, msg::Certs::new_empty().into());
920
            let e = chan.sender().check_cell(&cell);
921
            assert!(e.is_err());
922
            assert!(format!("{}", e.unwrap_err().source().unwrap())
923
                .contains("Can't send CERTS cell after handshake is done"));
924

            
925
            let cell = AnyChanCell::new(
926
                CircId::new(5),
927
                msg::Create2::new(HandshakeType::NTOR, &b"abc"[..]).into(),
928
            );
929
            let e = chan.sender().check_cell(&cell);
930
            assert!(e.is_ok());
931
            // FIXME(eta): more difficult to test that sending works now that it has to go via reactor
932
            // let got = output.next().await.unwrap();
933
            // assert!(matches!(got.msg(), ChanMsg::Create2(_)));
934
        });
935
    }
936

            
937
    #[test]
938
    fn chanbuilder() {
939
        let rt = PreferredRuntime::create().unwrap();
940
        let mut builder = ChannelBuilder::default();
941
        builder.set_declared_method(tor_linkspec::ChannelMethod::Direct(vec!["127.0.0.1:9001"
942
            .parse()
943
            .unwrap()]));
944
        let tls = MsgBuf::new(&b""[..]);
945
        let _outbound = builder.launch(tls, rt, fake_mq());
946
    }
947

            
948
    #[test]
949
    fn check_match() {
950
        let chan = fake_channel(fake_channel_details());
951

            
952
        let t1 = OwnedChanTarget::builder()
953
            .ed_identity([6; 32].into())
954
            .rsa_identity([10; 20].into())
955
            .build()
956
            .unwrap();
957
        let t2 = OwnedChanTarget::builder()
958
            .ed_identity([1; 32].into())
959
            .rsa_identity([3; 20].into())
960
            .build()
961
            .unwrap();
962
        let t3 = OwnedChanTarget::builder()
963
            .ed_identity([3; 32].into())
964
            .rsa_identity([2; 20].into())
965
            .build()
966
            .unwrap();
967

            
968
        assert!(chan.check_match(&t1).is_ok());
969
        assert!(chan.check_match(&t2).is_err());
970
        assert!(chan.check_match(&t3).is_err());
971
    }
972

            
973
    #[test]
974
    fn unique_id() {
975
        let ch1 = fake_channel(fake_channel_details());
976
        let ch2 = fake_channel(fake_channel_details());
977
        assert_ne!(ch1.unique_id(), ch2.unique_id());
978
    }
979

            
980
    #[test]
981
    fn duration_unused_at() {
982
        let details = fake_channel_details();
983
        let ch = fake_channel(Arc::clone(&details));
984
        details.unused_since.update();
985
        assert!(ch.duration_unused().is_some());
986
    }
987
}